Workflow Engine
Engine Interface
type EngineInterface interface {
Start(c context.Context, configName string, worker Worker) bool
Run() bool
Done() *WorkerResponse
Process(c context.Context, workflowName string, worker Worker) *WorkerResponse
ApplicationContext() context.Context
SetModulesPath(path string)
SetWorkflowName(workflowName string)
LoadJson(name string) error
GetResolvers() []string
GetFlow() *domain.Flow
GetWorker() *Worker
GetEngineName() string
GetModulesDir() string
GetApplication() pkg.Application
}
Execution Flow
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Start() │────▶│ LoadJson() │────▶│ Run() │────▶│ Done() │
│ Initialize │ │ Load Schema │ │ Execute │ │ Finalize │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌───────────────────┐
│ ProcessState() │◀──┐
│ Execute Transition│ │
└───────────────────┘ │
│ │
▼ │
┌───────────────────┐ │
│ can(next)? │───┘
│ More States? │
└───────────────────┘
Worker Interface
Workers process individual states within a workflow:
type Worker interface {
Context() *map[string]interface{}
SetContext(context *map[string]interface{})
Start(w EngineInterface) bool
InitResolvers(resolvers []string)
PrepareContext(w EngineInterface, flow *domain.Flow) bool
ProcessState(w EngineInterface, flow *domain.Flow) (bool, string)
Done(w EngineInterface) *WorkerResponse
ProcessStateError(w EngineInterface, flow *domain.Flow, errorMessage string)
SetResponse(response any, statusCode ...int)
GetResponse() (response any)
HandleError(err interface{}, statusCode int) (success bool)
SetError(error *issues.Issue, statusCode ...int) bool
}
State Types
| Type | Description |
|---|---|
initial | Entry point of workflow |
normal | Standard processing state |
final | Terminal state |
Workflow Execution Modes
The engine supports two execution modes: synchronous and asynchronous, with configurable restart policies.
Synchronous Execution (run_sync.go)
Runs a single workflow instance with retry support:
func RunOne(wfConfig domain.WorkflowConfigItem, app pkg.Application, contexts ...map[string]interface{}) map[string]*WorkerResponse
Flow:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ RunOne() │────▶│ runOneSync() │────▶│ run() │
│ Entry point │ │ Retry loop │ │ Execute flow │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ handleWorkflow │◀────│ ExecuteWorkflow │
│ Response() │ │ Routine() │
└─────────────────┘ └─────────────────┘
Asynchronous Execution (run_async.go)
Runs multiple workflow instances concurrently using goroutines:
func RunBatch(wfConfig domain.WorkflowConfigItem, app pkg.Application, wg *sync.WaitGroup,
resultChan chan *WorkerResponse, contexts ...map[string]interface{})
Flow:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ RunBatch() │────▶│ For each i in │────▶│ go runOneAsync()│
│ Entry point │ │ Amount │ │ (goroutine) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐
│ runWithId() │
│ + retry loop │
└─────────────────┘
│
▼
┌─────────────────┐
│ resultChan <- │
│ response │
└─────────────────┘
Restart Policies
| Policy | Constant | Behavior |
|---|---|---|
always | RestartPolicyAlways | Always restart after completion |
on-failure | RestartPolicyOnFailure | Restart only on error |
stop | RestartPolicyStop | Stop after first execution |
Workflow Managers (engine/manager/)
High-level managers that coordinate workflow execution:
WorkflowManager (Async)
func WorkflowManager(engineName string, workflowName string, amount int, args ...string) map[string]*workflow.WorkerResponse
- Initializes workflow workers
- Configures workflow from config or defaults
- Uses
sync.WaitGroupfor concurrent execution - Collects results via buffered channel
WorkflowManagerOnce (Sync)
func WorkflowManagerOnce(name string, args ...string) map[string]*workflow.WorkerResponse
- Single-instance execution
- Simpler setup for one-off runs
HandleWorkflow Interface
Programmatic interface for workflow processing:
type HandleWorkflow interface {
ProcessWorkflow(c context.Context, workflowName string, workflowContext *map[string]interface{}) *WorkerResponse
GetEngine() *EngineInterface
GetWorker() *Worker
}
Options:
debug: Enable debug loggingpreventLogError: Suppress error loggingpreventLogInfo: Suppress info logging
CLI and Entry Points
Application Entry Points
Generated projects include CLI entry points that can be built and run:
# Build the CLI application
make build-cli
# Run a workflow
./runtime/bin/myapp-cli [options] <workflow> [key=value ...]
Flags:
| Flag | Description | Default |
|---|---|---|
-v | Verbose mode (debug logging) | false |
-q | Quiet mode (suppress output) | false |
--amount=N | Concurrency (number of parallel instances) | 1 |
Example:
# Build first
make build-cli
# Run single workflow
./runtime/bin/myapp-cli workflows/hello_world.wsl message="Hello"
# Run with concurrency
./runtime/bin/myapp-cli --amount=5 workflows/batch_process.wsl input=data.json
# Verbose mode
./runtime/bin/myapp-cli -v workflows/debug_workflow.wsl
Root Entry Point (engine.go)
The package-level entry point:
func RunWorkflow(env string, options *domain.Options) map[string]*workflow.WorkerResponse {
environment := domain.NewEnvironment(env, options)
defer environment.ShutdownEnvironment()
return manager.WorkflowManager(environment)
}
domain.Options (declared in engine/domain/application.go):
type Options struct {
EngineName string
ConfigName string
Verbose bool
Quiet bool
Amount int
Retry int
RetryDelay int
RestartPolicy string // "always" | "on-failure" | "stop"
Workflow string
Version string
BuildTime string
LogPath string
Config *Config
Args []string
Context map[string]interface{}
Settings map[string]interface{}
EmbedFS *embed.FS
EmbedFSRootPath string
}