Skip to main content

Workflow Engine

Engine Interface

type EngineInterface interface {
Start(c context.Context, configName string, worker Worker) bool
Run() bool
Done() *WorkerResponse
Process(c context.Context, workflowName string, worker Worker) *WorkerResponse
ApplicationContext() context.Context
SetModulesPath(path string)
SetWorkflowName(workflowName string)
LoadJson(name string) error
GetResolvers() []string
GetFlow() *domain.Flow
GetWorker() *Worker
GetEngineName() string
GetModulesDir() string
GetApplication() pkg.Application
}

Execution Flow

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Start() │────▶│ LoadJson() │────▶│ Run() │────▶│ Done() │
│ Initialize │ │ Load Schema │ │ Execute │ │ Finalize │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘


┌───────────────────┐
│ ProcessState() │◀──┐
│ Execute Transition│ │
└───────────────────┘ │
│ │
▼ │
┌───────────────────┐ │
│ can(next)? │───┘
│ More States? │
└───────────────────┘

Worker Interface

Workers process individual states within a workflow:

type Worker interface {
Context() *map[string]interface{}
SetContext(context *map[string]interface{})
Start(w EngineInterface) bool
InitResolvers(resolvers []string)
PrepareContext(w EngineInterface, flow *domain.Flow) bool
ProcessState(w EngineInterface, flow *domain.Flow) (bool, string)
Done(w EngineInterface) *WorkerResponse
ProcessStateError(w EngineInterface, flow *domain.Flow, errorMessage string)
SetResponse(response any, statusCode ...int)
GetResponse() (response any)
HandleError(err interface{}, statusCode int) (success bool)
SetError(error *issues.Issue, statusCode ...int) bool
}

State Types

TypeDescription
initialEntry point of workflow
normalStandard processing state
finalTerminal state

Workflow Execution Modes

The engine supports two execution modes: synchronous and asynchronous, with configurable restart policies.

Synchronous Execution (run_sync.go)

Runs a single workflow instance with retry support:

func RunOne(wfConfig domain.WorkflowConfigItem, app pkg.Application, contexts ...map[string]interface{}) map[string]*WorkerResponse

Flow:

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ RunOne() │────▶│ runOneSync() │────▶│ run() │
│ Entry point │ │ Retry loop │ │ Execute flow │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ handleWorkflow │◀────│ ExecuteWorkflow │
│ Response() │ │ Routine() │
└─────────────────┘ └─────────────────┘

Asynchronous Execution (run_async.go)

Runs multiple workflow instances concurrently using goroutines:

func RunBatch(wfConfig domain.WorkflowConfigItem, app pkg.Application, wg *sync.WaitGroup,
resultChan chan *WorkerResponse, contexts ...map[string]interface{})

Flow:

┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ RunBatch() │────▶│ For each i in │────▶│ go runOneAsync()│
│ Entry point │ │ Amount │ │ (goroutine) │
└─────────────────┘ └─────────────────┘ └─────────────────┘


┌─────────────────┐
│ runWithId() │
│ + retry loop │
└─────────────────┘


┌─────────────────┐
│ resultChan <- │
│ response │
└─────────────────┘

Restart Policies

PolicyConstantBehavior
alwaysRestartPolicyAlwaysAlways restart after completion
on-failureRestartPolicyOnFailureRestart only on error
stopRestartPolicyStopStop after first execution

Workflow Managers (engine/manager/)

High-level managers that coordinate workflow execution:

WorkflowManager (Async)

func WorkflowManager(engineName string, workflowName string, amount int, args ...string) map[string]*workflow.WorkerResponse
  • Initializes workflow workers
  • Configures workflow from config or defaults
  • Uses sync.WaitGroup for concurrent execution
  • Collects results via buffered channel

WorkflowManagerOnce (Sync)

func WorkflowManagerOnce(name string, args ...string) map[string]*workflow.WorkerResponse
  • Single-instance execution
  • Simpler setup for one-off runs

HandleWorkflow Interface

Programmatic interface for workflow processing:

type HandleWorkflow interface {
ProcessWorkflow(c context.Context, workflowName string, workflowContext *map[string]interface{}) *WorkerResponse
GetEngine() *EngineInterface
GetWorker() *Worker
}

Options:

  • debug: Enable debug logging
  • preventLogError: Suppress error logging
  • preventLogInfo: Suppress info logging

CLI and Entry Points

Application Entry Points

Generated projects include CLI entry points that can be built and run:

# Build the CLI application
make build-cli

# Run a workflow
./runtime/bin/myapp-cli [options] <workflow> [key=value ...]

Flags:

FlagDescriptionDefault
-vVerbose mode (debug logging)false
-qQuiet mode (suppress output)false
--amount=NConcurrency (number of parallel instances)1

Example:

# Build first
make build-cli

# Run single workflow
./runtime/bin/myapp-cli workflows/hello_world.wsl message="Hello"

# Run with concurrency
./runtime/bin/myapp-cli --amount=5 workflows/batch_process.wsl input=data.json

# Verbose mode
./runtime/bin/myapp-cli -v workflows/debug_workflow.wsl

Root Entry Point (engine.go)

The package-level entry point:

func RunWorkflow(env string, options *domain.Options) map[string]*workflow.WorkerResponse {
environment := domain.NewEnvironment(env, options)
defer environment.ShutdownEnvironment()
return manager.WorkflowManager(environment)
}

domain.Options (declared in engine/domain/application.go):

type Options struct {
EngineName string
ConfigName string
Verbose bool
Quiet bool
Amount int
Retry int
RetryDelay int
RestartPolicy string // "always" | "on-failure" | "stop"
Workflow string
Version string
BuildTime string
LogPath string
Config *Config
Args []string
Context map[string]interface{}
Settings map[string]interface{}
EmbedFS *embed.FS
EmbedFSRootPath string
}