Event Sourcing Rule
Every state change MUST use this pattern. No exceptions.
Event Sourcing Implementation
Architecture
Storage: Valkey backend via timebox.Store (NOT EventHub)
- Engine state:
timebox.Storewithtimebox.Executor[*api.EngineState] - Flow state:
timebox.Storewithtimebox.Executor[*api.FlowState] - Concurrency: Optimistic (sequence-based versioning, automatic retry on conflict)
WebSocket Notifications (separate from event sourcing):
- EventHub:
pkg/events/hub.go(usesgithub.com/kode4food/caravanTopic) - Purpose: Broadcast events to WebSocket subscribers
- NOT used for event sourcing - only for real-time UI updates
- Produces from timebox events, doesn't drive execution
State Mutations: Executor Pattern (Only Pattern)
Every state change MUST use this pattern. No exceptions.
Core pattern:
cmd := func(state *StateType, ag *Aggregator) error {
// 1. Read state (read-only)
// 2. Decide if mutation needed
// 3. Raise events via aggregator ONLY
events.Raise(ag, eventType, eventData)
// 4. Register side effects via ag.OnSuccess()
// This is called INSIDE the command, runs AFTER commit
ag.OnSuccess(func(state *StateType) {
// Network calls, starting work, cross-aggregate ops
// Runs ONCE after Exec completes and commits
})
return nil
}
executor.Exec(ctx, aggregateID, cmd)
CRITICAL: Side Effects Must Use ag.OnSuccess()
Anything with side effects (network calls, starting real work, cross-aggregate operations, retry queue updates) MUST be registered via ag.OnSuccess() inside the command callback, NOT called directly in the command.
Why: If the command retries (optimistic concurrency conflict), direct side effects execute multiple times. ag.OnSuccess() runs only once, after Exec commits.
Real patterns from the codebase:
For engine state mutations (execEngine - no side effects needed):
// registry.go: UpdateStepHealth
func (e *Engine) UpdateStepHealth(stepID api.StepID, health api.HealthStatus, errMsg string) error {
cmd := func(st *api.EngineState, ag *Aggregator) error {
if stepHealth, ok := st.Health[stepID]; ok {
if stepHealth.Status == health && stepHealth.Error == errMsg {
return nil // Idempotent
}
}
return events.Raise(ag, api.EventTypeStepHealthChanged,
api.StepHealthChangedEvent{StepID: stepID, Status: health, Error: errMsg})
}
_, err := e.execEngine(cmd) // Pure mutation, no OnSuccess needed
return err
}
For flow state mutations (flowTx wrapper with OnSuccess for side effects):
// engine.go: StartFlow using flowTx
func (e *Engine) StartFlow(flowID api.FlowID, plan *api.ExecutionPlan, initState api.Args, meta api.Metadata) error {
return e.flowTx(flowID, func(tx *flowTx) error { // flowTx wraps execFlow
if err := events.Raise(tx.FlowAggregator, api.EventTypeFlowStarted, ...); err != nil {
return err
}
tx.OnSuccess(func(*api.FlowState) {
e.handleFlowActivated(flowID, meta) // Side effect after commit
})
// Prepare initial steps (may register more OnSuccess)
for _, stepID := range tx.findInitialSteps(tx.Value()) {
tx.prepareStep(stepID)
}
return nil // All events committed, then all OnSuccess handlers run
})
}
Work execution (inside flowTx prepareStep):
// flow-exec.go: prepareStep (called inside flowTx command)
func (tx *flowTx) prepareStep(stepID api.StepID) error {
if err := events.Raise(tx.FlowAggregator, api.EventTypeStepStarted, ...); err != nil {
return err
}
started, err := tx.startPendingWork(stepID, step)
if len(started) > 0 {
tx.OnSuccess(func(flow *api.FlowState) {
// Execute work AFTER commit succeeds
tx.handleWorkItemsExecution(stepID, step, inputs, flow.Metadata, started)
})
}
return nil
}
Flow completion (inside flowTx checkTerminal):
// flow-exec.go: checkTerminal (called inside flowTx command)
func (tx *flowTx) checkTerminal() error {
if tx.isFlowComplete(flow) {
if err := events.Raise(tx.FlowAggregator, api.EventTypeFlowCompleted, ...); err != nil {
return err
}
tx.OnSuccess(func(*api.FlowState) {
tx.handleFlowCompleted() // Cleanup: remove from retry queue
})
}
return nil
}
Rules (non-negotiable):
- State parameter: READ-ONLY (never modify directly)
- Mutations: ONLY via
events.Raise(ag, type, data) - Idempotency: Check state before raising event (avoid duplicate events)
- Executor: Handles conflict retries automatically (no manual retry needed)
- Atomicity: Events and projections commit together
CRITICAL: Stale References After Event Raise
MUST KNOW: When you raise an event, the aggregator IMMEDIATELY applies it. But your state reference becomes STALE.
cmd := func(st *api.FlowState, ag *FlowAggregator) error {
// st is current state
flow := ag.Value() // Same as st
// Raise event: aggregator IMMEDIATELY applies it
if err := events.Raise(ag, api.EventTypeAttributeSet,
api.AttributeSetEvent{...}); err != nil {
return err
}
// ❌ WRONG: flow is now stale! It doesn't have the new attribute
if v, ok := flow.GetAttributes()[name]; ok { ... } // STALE DATA
// ✅ CORRECT: fetch fresh state after raising event
updatedFlow := ag.Value() // NEW reference with applied event
if v, ok := updatedFlow.GetAttributes()[name]; ok { ... } // FRESH
}
Why: Persistent data structures mean events create new aggregate versions. Old references point to old versions.
Pattern when chaining operations:
cmd := func(st *api.FlowState, ag *FlowAggregator) error {
// Raise event 1
if err := events.Raise(ag, EventType1, data1); err != nil {
return err
}
// Fetch updated state
current := ag.Value()
// Check updated state and maybe raise event 2
if someCondition(current) {
if err := events.Raise(ag, EventType2, data2); err != nil {
return err
}
}
// Fetch again if you need latest
latest := ag.Value()
// ... use latest
return nil
}
Key locations to check:
engine/internal/engine/flow-exec.go- prepareStep, handleWorkSucceeded (chains events)- Anywhere you raise multiple events in sequence
Critical Constraint: Event Recording vs Step Launching
MAINTAIN THIS SEPARATION:
// In flowTx (flow execution context):
// 1. Always record event (even if flow is terminal)
events.Raise(ag, api.EventTypeWorkSucceeded, ...)
// 2. SEPARATE decision: only execute next steps if flow is active
if !isTerminal(flow.Status) {
// prepare next step
}
Rationale:
- Events recorded even after flow fails (complete audit trail)
- Step launching stopped only when flow is terminal
- Preserves complete audit trail and late-arriving work completions
Example:
1. Payment fails → flow_failed event
2. Flow is terminal, no new steps start
3. Inventory reservation still running → work_succeeded recorded
4. Outputs recorded in event log for complete audit trail
Flow Execution Flow
POST /engine/flow (server)
↓
engine.StartFlow() calls flowTx
↓
flowTx raises FlowStartedEvent
↓
flowTx.execFlow() uses Executor pattern
↓
Command execution:
- Ready steps identified
- StepStarted event raised for each
- Work items created and executed
↓
Work completes (sync) or callback received (async)
↓
CompleteWork() calls flowTx
↓
flowTx raises WorkSucceededEvent
↓
Aggregator updates flow state from events
↓
Check if all goals satisfied
↓
Raise FlowCompletedEvent or continue loop
↓
When no active work: FlowDeactivatedEvent
State Reconstruction (Recovery)
Executor automatically reconstructs state by replaying events:
// Conceptual - timebox handles this internally
func reconstructState(aggregateID ID) State {
events := store.LoadEvents(aggregateID)
state := NewState()
for _, event := range events {
state = applyEvent(state, event)
}
return state
}
How recovery works:
- Engine.Start() calls RecoverFlows()
- Executor loads events from Valkey for each flow
- Replays events to reconstruct exact state
- Resume from where it left off
- No external coordination needed
Event Types
Engine aggregate events (step registry and active flow tracking):
step_registered- Step added to registrystep_unregistered- Step deleted from registrystep_updated- Step definition modifiedstep_health_changed- Step availability changedflow_activated- Flow added to active flows listflow_archiving- Flow selected for archivingflow_archived- Flow moved to external storage
Flow aggregate events (flow execution state):
flow_started- Execution beginsflow_completed- All goals satisfiedflow_failed- Goal unreachable or failedstep_started- Step preparing to executestep_completed- Step succeededstep_failed- Step encountered errorstep_skipped- Predicate returned falsework_started- Work item execution beginswork_succeeded- Work item completed successfullywork_failed- Work item failedwork_not_completed- Work item reports not ready (triggers retry scheduling)retry_scheduled- Work item retry scheduled for future timeattribute_set- Step outputs added to flow stateflow_digest_updated- Flow status digest updated (internal)flow_deactivated- Flow terminal + no active work
Key Locations
- State mutation:
engine/internal/engine/flow-exec.go(flowTx pattern) - Executor setup:
engine/internal/engine/engine.go(NewExecutor calls) - Event types:
engine/pkg/events/(event definitions) - Recovery:
engine/internal/engine/recover.go(RecoverFlows logic) - Retry queue:
engine/internal/engine/retry_queue.go(scheduled retries, NOT event-driven) - WebSocket broadcast:
engine/pkg/events/hub.go(separate from event sourcing)