Skip to main content

Worker

Worker Deep Dive (worker.go)

The worker.go file implements the core Worker interface and the workflowWorker struct, which is responsible for processing workflow states and managing transitions.

Worker Structure

type workflowWorker struct {
WorkflowContext WorkerContext // Shared context across workflow
Transitions map[string]interfaces.ServiceTransitions // Registered transitions
TransitionsResolved map[string]*ServiceTransitionMapping // Resolved transition mappings
TransitionsNamespaces []string // Namespace paths for resolution
Response *WorkerResponse // Final workflow response
Debug bool // Debug mode flag
Options map[string]interface{} // Additional options
}

Key Worker Methods

Start(w EngineInterface) bool

Initializes the worker by resolving all transition handlers from the DI container.

func (baseWorker *workflowWorker) Start(w EngineInterface) bool {
baseWorker.InitResolvers(w.GetResolvers())
return true
}

InitResolvers(resolvers []string)

Resolves transition handlers from the dependency injection container:

  1. Iterates through resolver paths (e.g., services/common/response)
  2. Looks up each resolver in the DI container using the transition prefix
  3. Stores resolved ServiceTransitionMapping for later use
  4. Tracks namespaces for relative path resolution

PrepareContext(w EngineInterface, flow *domain.Flow) bool

Sets up the workflow context with essential references:

func (baseWorker *workflowWorker) PrepareContext(w EngineInterface, flow *domain.Flow) bool {
baseWorker.WorkflowContext.SetValue("workflow.Flow", flow)
baseWorker.WorkflowContext.SetValue("workflow.EngineInterface", w)
baseWorker.WorkflowContext.SetValue("workflow.Worker", baseWorker)
return true
}

ProcessState(w EngineInterface, flow *domain.Flow) (bool, string)

The main state processing method. This is where the magic happens:

  1. Parameter Binding: Binds call arguments to state parameters
  2. Conditional Execution: Evaluates if/else conditions on transitions
  3. Nested Workflows: Handles workflow: prefixed states for sub-workflow execution
  4. Transition Resolution: Finds the appropriate transition handler
  5. Method Invocation: Calls the transition method via reflection
  6. Result Processing: Handles success/failure and determines next state
┌─────────────────────────────────────────────────────────────────────┐
│ ProcessState Flow │
├─────────────────────────────────────────────────────────────────────┤
│ 1. Parse state name (extract method path) │
│ 2. Create WorkerSessionContext │
│ 3. Bind parameters from transition options │
│ 4. Evaluate if/else conditions │
│ 5. Check for nested workflow (workflow: prefix) │
│ 6. Resolve transition handler │
│ 7. Call transition method via CallTransitionByName │
│ 8. Process FlowStepResult │
│ 9. Handle true/false branch routing │
│ 10. Return (success, nextStateName) │
└─────────────────────────────────────────────────────────────────────┘
Step 1 — Initialisation

The function begins by parsing the current state name (which may carry an optional #comment suffix), extracting the command prefix (e.g. workflow:), promoting a parent flow stored in the shared context, and constructing the WorkerSessionContext that every transition will receive.

func (baseWorker *workflowWorker) ProcessState(w EngineInterface, flow *domain.Flow) (bool, string) {
// Strip inline comment: "MyState#some-comment" → command[0] = "MyState"
comments := strings.Split(flow.CurrentState.State, "#")
command := strings.SplitN(comments[0], ":", 2) // e.g. ["workflow", "my_wf"] or ["MyState"]

// Promote a parent flow injected by a sub-workflow caller
context := baseWorker.WorkflowContext.Context()
if parentFlow, ok := (*context)["Parent"]; ok {
flow.Parent = parentFlow.(*domain.Flow)
delete(*context, "Parent")
}

// Build the per-state execution context passed to every transition
workerSessionContext := WorkerSessionContext{
WorkflowContext: baseWorker.WorkflowContext,
Worker: baseWorker,
Flow: flow,
Engine: w,
}
// ...
}
Step 2 — Parameter Binding

States can declare named parameters (e.g. state Process(UserData)) whose values are supplied by the calling transition. ProcessState resolves those values and injects them into the session context before the transition method is invoked.

// Bind call arguments to target state parameters (if provided)
if flow.CurrentTransition != nil && flow.CurrentState != nil && flow.CurrentTransition.Options != nil {

// 1. Resolve parameter names
// Priority: transition option "_call.paramNames" > state option "_params"
var paramNames []string
if pn, ok := flow.CurrentTransition.Options["_call.paramNames"]; ok {
if arr, ok := pn.([]interface{}); ok {
for _, v := range arr {
if s, ok := v.(string); ok {
paramNames = append(paramNames, s)
}
}
}
}
if len(paramNames) == 0 && flow.CurrentState.Options != nil {
if pn, ok := flow.CurrentState.Options["_params"]; ok {
if arr, ok := pn.([]interface{}); ok {
for _, v := range arr {
if s, ok := v.(string); ok {
paramNames = append(paramNames, s)
}
}
}
}
}

if len(paramNames) > 0 {
// 2. Resolve positional argument list from "_call.args.map"
// The map is keyed by the predecessor state name so different callers
// can pass different arguments to the same target state.
var argsList []string
if amap, ok := flow.CurrentTransition.Options["_call.args.map"]; ok {
if m, ok := amap.(map[string]interface{}); ok {
// Determine the predecessor ("from") state
from := "_"
if flow.LastTrace != nil && flow.LastTrace.GetFrom() != "" {
from = flow.LastTrace.GetFrom()
}

// Lookup order (most-specific first):
// 1. Exact state name e.g. "ValidateOrder#1"
// 2. Name without hash suffix e.g. "ValidateOrder"
// 3. Wildcard key "_"
candidates := []string{from}
if idx := strings.Index(from, "#"); idx > 0 {
candidates = append(candidates, from[:idx])
}
if from != "_" {
candidates = append(candidates, "_")
}
for _, key := range candidates {
if raw, ok := m[key]; ok {
if arr, ok := raw.([]interface{}); ok {
for _, v := range arr {
if s, ok := v.(string); ok {
argsList = append(argsList, s)
}
}
}
break // use the first matching key
}
}
}
}

// 3. Evaluate and bind each argument positionally into the session context
// If no args were found the state parameters remain unset (treated as optional).
if len(argsList) > 0 {
for i, pname := range paramNames {
if i >= len(argsList) {
break
}
raw := strings.TrimSpace(argsList[i])

// Type inference for literal values:
// "quoted" → string (quotes stripped)
// 42 / 3.14 → int / float64
// true/false → bool
// anything else → resolved from the flow context via ParseProperty
var val interface{} = raw
if len(raw) >= 2 &&
((raw[0] == '"' && raw[len(raw)-1] == '"') ||
(raw[0] == '\'' && raw[len(raw)-1] == '\'')) {
val = strings.Trim(raw, "\"'")
} else if iv, err := strconv.ParseInt(raw, 10, 64); err == nil {
val = int(iv)
} else if fv, err := strconv.ParseFloat(raw, 64); err == nil {
val = fv
} else if strings.EqualFold(raw, "true") || strings.EqualFold(raw, "false") {
val = strings.EqualFold(raw, "true")
} else {
// Resolve as a context property reference (e.g. $User.ID)
if _, resolved, err := workerSessionContext.ParseProperty(raw); err == nil {
val = resolved
}
}

// Inject resolved value so the transition can read it via ctx.Context("pname")
workerSessionContext.SetContext(pname, val)
}
}
}
}

Key design decisions in parameter binding:

ConcernSolution
Multiple callers pass different args to one state_call.args.map keyed by predecessor state name
State name may carry a #suffix (e.g. #1 for deduplicated state copies)Hash suffix stripped before map lookup: "State#1""State"
Caller not in mapFalls back to wildcard key "_"
No args at allState parameters left unset; treated as optional
Literal vs. dynamic valuesType inference: quoted → string, numeric → int/float64, true/falsebool, otherwise ParseProperty

Error Handling

The worker provides comprehensive error handling:

func (baseWorker *workflowWorker) HandleError(err interface{}, statusCode int) (success bool)
func (baseWorker *workflowWorker) SetError(error *issues.Issue, statusCode ...int) bool
func (baseWorker *workflowWorker) SetErrors(errors *issues.Issues, statusCode ...int) bool
func (baseWorker *workflowWorker) ProcessStateError(w EngineInterface, flow *domain.Flow, errorMessage string)

Transition Invocation (call_function.go)

The call_function.go file handles dynamic method invocation on transition handlers using Go's reflection capabilities.

CallTransitionByName

The main function for invoking transition methods:

func CallTransitionByName(
path string, // e.g., "services/common/response/ResponseValue"
workerSessionContext *WorkerSessionContext, // Current session context
transition *ServiceTransitionMapping, // Resolved transition mapping
metaCache map[string]map[string]map[string]interfaces.FunctionMetadata // Metadata cache
) ([]reflect.Value, error)

Invocation Process

┌───────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Parse Path │───▶│ Lookup Method │───▶│ Get Metadata │
│ service/trans/ │ │ via Reflection │ │ (arg names, │
│ method │ │ │ │ types) │
└───────────────────┘ └──────────────────┘ └──────────────────┘


┌───────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Return Results │◀───│ Call Method │◀───│ Prepare Inputs │
│ (FlowStepResult) │ │ │ │ │
└───────────────────┘ └──────────────────┘ └──────────────────┘

PrepareInput

Prepares method arguments by looking up values from the session context:

func PrepareInput(
workerSessionContext *WorkerSessionContext,
expectedType reflect.Type,
argName string
) (reflect.Value, error)

Process:

  1. Looks up argument value using ExprLookup (handles <<property>> syntax)
  2. Falls back to direct context map lookup
  3. Casts the value to the expected type

CastInput

Handles type conversion between context values and expected method parameter types:

func castInput(val any, expectedType reflect.Type) (reflect.Value, error)

Conversion Strategies:

  1. Direct Assignment: If types are directly assignable
  2. Map to Struct: JSON marshal/unmarshal for map → struct pointer conversion
  3. Type Conversion: For convertible types (e.g., int → float64)
  4. JSON Serialization: For complex types to string conversion

Worker Session Context (worker_session_context.go)

The WorkerSessionContext provides a rich API for accessing and manipulating workflow data during state execution.

Structure

type WorkerSessionContext struct {
Engine EngineInterface // Reference to the workflow engine
Flow *domain.Flow // Current flow state
Worker Worker // Parent worker reference
WorkflowContext WorkerContext // Shared workflow context
ServerContext baseContext.Context // Server-level context
}

Property Resolution

The session context implements a sophisticated property resolution system:

Property Syntax

SyntaxDescriptionExample
keyDirect property lookupusername
key.nestedNested property accessuser.profile.name
<<key>>Template-style reference<<userId>>
key|modifierProperty with type modifiercount|int
key??defaultDefault value fallbackname??"Unknown"

Available Modifiers

ModifierDescription
intConvert to integer
stringConvert to string
boolConvert to boolean
pintParse and convert to int
pstringParse and convert to string
optionLookup from flow options
stringsConvert to string slice
parseParse template expressions
propertyTreat value as property key

Key Methods

Context Management

func (p *WorkerSessionContext) StartStep() *WorkerSessionContext
func (p *WorkerSessionContext) UpdateContext() *map[string]interface{}
func (p *WorkerSessionContext) Context(key string) interface{}
func (p *WorkerSessionContext) SetContext(key string, value interface{}) *WorkerSessionContext

Value Access

func (p *WorkerSessionContext) Value(key string) interface{}
func (p *WorkerSessionContext) SetValue(key string, value interface{}) *WorkerSessionContext
func (p *WorkerSessionContext) Option(key string) interface{}
func (p *WorkerSessionContext) Property(key string) interface{}

Property Resolution

func (p *WorkerSessionContext) GetProperty(prop string) (key string, value interface{}, err error)
func (p *WorkerSessionContext) ParseProperty(prop string) (key string, value interface{}, err error)
func (p *WorkerSessionContext) RecursiveParseProperty(prop interface{}) (key string, value interface{}, err error)

Type Helpers

func (p *WorkerSessionContext) String(key string, byDefault ...string) string
func (p *WorkerSessionContext) Int(key string, byDefault ...int) (value int, ok bool)
func (p *WorkerSessionContext) Bool(key string) bool

Lookup Priority

The context uses a hierarchical lookup order:

┌─────────────────────────────────────────────────────────────────────┐
│ Property Lookup Order │
├─────────────────────────────────────────────────────────────────────┤
│ 1. values (workflow-scoped values set during execution) │
│ 2. transition options (current transition's options) │
│ 3. flow options (workflow-level options) │
│ 4. parent.transition options (parent workflow's transition) │
│ 5. parent options (parent workflow's options) │
│ 6. context root (direct context keys) │
│ 7. constants (module-level constants) │
└─────────────────────────────────────────────────────────────────────┘

Expression Lookup (ExprLookup)

The ExprLookup method provides advanced expression evaluation:

func (p *WorkerSessionContext) ExprLookup(argName string) (value interface{}, err error)

Process:

  1. Handle <<key>> template syntax
  2. Look up property value
  3. Recursively parse if value contains templates
  4. Return resolved value

Return Value Handling

States can return values that subsequent states can access:

// Set return value for current transition
func (p *WorkerSessionContext) Return(value interface{}) *WorkerSessionContext

// Get return value from previous state
func (p *WorkerSessionContext) LastReturn() interface{}