Skip to content

EventFlow Event Queue Proposal

Reliable, Ordered Event Processing for State Machines

Version: 1.0 Date: December 2024 Status: ✅ Implemented


This proposal has been implemented and integrated into the main documentation.

See the following documentation pages:

This proposal is kept as an archive for reference.


1. Executive Summary

EventFlow machines process events to transition between states. As systems scale, reliable event processing becomes critical. This proposal introduces Event Queues - a natural language approach to configuring ordered, reliable, and observable event processing.

Core Philosophy

Events flow through queues. Queues guarantee order.

Machines process. Queues orchestrate.

Key Features

  1. Per-aggregate ordering - Events for the same instance processed in order
  2. System & machine queues - Hierarchical configuration
  3. Priority support - Critical events processed first
  4. Retry mechanism - Automatic retry with backoff
  5. Dead letter queue - Failed events preserved for analysis
  6. CLI monitoring - eventflow queue commands for visibility

2. Motivation & Problem Statement

2.1 Current Situation

EventFlow machines receive and process events, but there's no explicit model for:

  • How events are queued before processing
  • What happens when processing fails
  • How to prioritize critical events
  • How to monitor queue health

2.2 Real-World Challenges

Problem 1: Race Conditions
─────────────────────────
Customer sends :add_item and :checkout nearly simultaneously.
Without ordering, :checkout might process before :add_item.

Problem 2: Transient Failures
─────────────────────────────
Payment gateway times out. Without retry, order is stuck.

Problem 3: Visibility
─────────────────────
100 orders are processing slowly. How do we know what's pending?

2.3 Goals

  1. Guarantee per-aggregate ordering - Same order instance, sequential processing
  2. Enable parallel processing - Different aggregates can process concurrently
  3. Handle failures gracefully - Retry with backoff, preserve failed events
  4. Provide observability - CLI tools for queue monitoring

3. Queue Architecture

3.1 Machine vs Instance

Before understanding queue architecture, it's important to distinguish:

  • Machine (Template): A .flow file definition (e.g., machine: @order)
  • Instance (Aggregate): A logical entity identified by unique ID (e.g., @order:abc123)

Instance Lifecycle (PHP/Laravel):

Instances don't live in memory continuously. They are reconstructed on demand:

  1. Event arrives (HTTP request or queue job)
  2. Load instance state from database (event store or snapshot)
  3. Process event (run guards, actions, transitions)
  4. Persist new state to database
  5. Instance discarded from memory
Request 1: :checkout for @order:abc123
  └─> Load state → Process → Persist → Done

Request 2: :payment_ok for @order:abc123 (minutes later)
  └─> Load state → Process → Persist → Done

Multiple requests can process different instances in parallel:

Request A: @order:abc123 → processing
Request B: @order:xyz789 → processing (parallel)
Request C: @order:def456 → processing (parallel)

But the same instance is processed sequentially (via locking).

Each instance has its own state, context, and event history. Instances never share state - they only communicate through events.

3.2 What Concurrency Means

concurrency: 10 means 10 different instances can process events simultaneously.

Within each instance: Events are always processed sequentially (one at a time).

concurrency: 3

@order:abc │ :checkout │ :payment_ok │
@order:xyz │ :add_item │ :checkout   │  ← parallel with abc
@order:def │ :checkout │             │  ← parallel with abc, xyz
@order:ghi │ (waiting) │             │  ← waits for a slot

This is cross-instance parallelism, not parallel event processing within an instance.

Laravel Implementation

EventFlow automatically configures Laravel Horizon based on your .flow files:

flow
// order.flow
machine: @order

queue:
  concurrency: 10
  timeout: 60 seconds
flow
// payment.flow
machine: @payment

queue:
  concurrency: 3

When you run eventflow serve or eventflow horizon:config, EventFlow:

  1. Scans all .flow files for queue: blocks
  2. Generates Horizon supervisor configuration
  3. Creates per-machine queues (eventflow.order, eventflow.payment)

Generated Config (internal):

php
// Auto-generated by EventFlow - do not edit manually
'supervisors' => [
    'eventflow-order' => [
        'queue' => ['eventflow.order'],
        'processes' => 10,  // from queue.concurrency
    ],
    'eventflow-payment' => [
        'queue' => ['eventflow.payment'],
        'processes' => 3,
    ],
],

You write .flow files, EventFlow handles the infrastructure.

How it works at runtime:

  1. Async events (emit async) are dispatched to Laravel queues
  2. Horizon workers process queue jobs (one per process)
  3. Per-aggregate locking ensures same instance is sequential
Queue: eventflow.order
├── Worker 1 → @order:abc123 (locked)
├── Worker 2 → @order:xyz789
├── Worker 3 → @order:def456
└── Worker 4 → waiting for jobs...

@order:abc123 events:
  :checkout (processing by Worker 1)
  :add_item (waiting - same aggregate locked)

Sync events (emit default) don't go through queue - they execute inline within the same request/process.

3.3 Conceptual Model

                    ┌─────────────────────────────────────┐
                    │         System Queue                │
                    │   (e-commerce, concurrency: 10)     │
                    └─────────────────────────────────────┘

           ┌────────────────────────┼────────────────────────┐
           │                        │                        │
           ▼                        ▼                        ▼
┌─────────────────────┐  ┌─────────────────────┐  ┌─────────────────────┐
│    @order Queue     │  │   @payment Queue    │  │  @inventory Queue   │
│  (concurrency: 5)   │  │  (concurrency: 3)   │  │  (concurrency: 1)   │
└─────────────────────┘  └─────────────────────┘  └─────────────────────┘
           │                        │                        │
    ┌──────┴──────┐          ┌──────┴──────┐                 │
    │             │          │             │                 │
    ▼             ▼          ▼             ▼                 ▼
order-123    order-456   payment-a    payment-b         inventory
(sequential) (parallel)  (parallel)   (parallel)        (serial)

Note: @order Queue (concurrency: 5) means up to 5 different order instances can process events in parallel. Each instance still processes its own events sequentially.

3.4 Per-Aggregate Ordering

Within a machine, events for the same aggregate are always processed sequentially:

@order:abc123 events:
  1. :add_item      → processes first
  2. :add_item      → waits for #1
  3. :checkout      → waits for #2
  4. :payment_ok    → waits for #3

@order:xyz789 events (parallel with abc123):
  1. :checkout      → processes immediately
  2. :payment_ok    → waits for #1

This guarantees consistency: an order can't be checked out before items are added.

3.5 Cross-Aggregate Parallelism

Different aggregates can process in parallel:

Time →
───────────────────────────────────────────────────
@order:abc │ :add_item │ :checkout │ :payment_ok │
───────────────────────────────────────────────────
@order:xyz │ :checkout │ :ship     │             │
───────────────────────────────────────────────────
@order:def │ :add_item │ :add_item │ :checkout   │
───────────────────────────────────────────────────

3.6 Conversation Context in Machine Systems

In a machine system, multiple machine instances work together within a conversation:

Conversation: conv-abc123
├── @order:order-001      → #awaiting_payment
├── @payment:pay-001      → #processing
└── @inventory:inv-001    → #reserved

Event Routing:

When @order emits to @payment:

flow
// In @order machine
emit :payment_request to @payment

EventFlow:

  1. Checks if a @payment instance exists in this conversation
  2. If yes → routes to existing instance
  3. If no → creates new @payment instance in same conversation

Queue Ordering:

Events are ordered by conversation + machine + instance:

Queue: eventflow.payment
├── conv-abc:pay-001 → [:payment_request, :cancel_payment]  // sequential
├── conv-xyz:pay-002 → [:payment_request]                   // parallel
└── conv-def:pay-003 → [:refund_request]                    // parallel

Same conversation, same machine instance = sequential. Different conversations = parallel.


4. Sync and Async Emit

4.1 Default Behavior: Sync

By default, emit is synchronous - it waits for the target machine to process and returns the result:

flow
on :checkout from @customer (api)
  $payment becomes emit :payment_request to @payment

  ? $payment.success
    order moves to #paid
    reply 201 with:
      | order_id | $order_id |
      | txn_id   | $payment.transaction_id |
  otherwise
    reply 400 with:
      | error | $payment.error |

The sync emit:

  • Sends event to target machine
  • Waits for target to process
  • Returns the target's reply as a result
  • Allows using result in guards and responses

Note: Sync emit bypasses the queue entirely - it executes inline within the same request/process. The target machine's queue: configuration only applies to async events (emit async).

4.2 Async Emit

Use emit async to queue the event without waiting:

flow
on :checkout from @customer (api)
  order moves to #awaiting_payment

  emit async :payment_request to @payment
  emit async :analytics_event to @analytics

  reply 202 with:
    | order_id | $order_id |
    | status   | "processing" |

The async emit:

  • Puts event on the queue
  • Does not wait for processing
  • Returns immediately
  • Target processes asynchronously

4.3 Capturing Sync Results

The sync emit returns the target handler's reply:

flow
// Payment machine
on :payment_request from @order
  charge card
  reply 200 with:
    | success        | true      |
    | transaction_id | $txn_id   |

// Order machine - captures the reply
$result becomes emit :payment_request to @payment
// $result.success = true
// $result.transaction_id = "txn-123"

4.4 Recursive Chains

Sync emit chains are allowed - each reply bubbles up to its caller:

flow
// Order machine (API entry point)
on :checkout from @customer (api)
  $payment becomes emit :payment_request to @payment

  reply 201 with:                    // ← THIS goes to API caller
    | order_id    | $order_id            |
    | txn_id      | $payment.txn_id      |
    | fraud_score | $payment.fraud_score |

// Payment machine
on :payment_request from @order
  $fraud becomes emit :check_fraud to @fraud

  reply 200 with:                    // ← Returns to Order (not API)
    | txn_id      | $txn_id       |
    | fraud_score | $fraud.score  |

// Fraud machine
on :check_fraud from @payment
  reply 200 with:                    // ← Returns to Payment (not API)
    | score  | 0.1     |
    | status | "clean" |

Execution Timeline:

API Request: POST /checkout


┌─ Order Handler ─────────────────────────────────────────┐
│  1. $payment becomes emit :payment_request to @payment  │
│     │                                                   │
│     ▼                                                   │
│  ┌─ Payment Handler ─────────────────────────────┐     │
│  │  2. $fraud becomes emit :check_fraud to @fraud │     │
│  │     │                                          │     │
│  │     ▼                                          │     │
│  │  ┌─ Fraud Handler ─────────────────────┐      │     │
│  │  │  3. reply 200 { score: 0.1 }        │      │     │
│  │  └────────────────────────────────────-┘      │     │
│  │     │                                          │     │
│  │     ▼ ($fraud = { score: 0.1 })               │     │
│  │  4. reply 200 { txn_id, fraud_score }         │     │
│  └────────────────────────────────────────────────┘     │
│     │                                                   │
│     ▼ ($payment = { txn_id, fraud_score })             │
│  5. reply 201 { order_id, txn_id, fraud_score }        │
└─────────────────────────────────────────────────────────┘


API Response: 201 { order_id, txn_id, fraud_score }

Key Points:

  • Only the outermost handler's reply goes to the API caller
  • Inner replies become return values captured by $var becomes emit
  • Each handler controls what data to expose to its caller

4.5 When to Use Each

PatternUse Case
emit :event (sync)Need result for reply, sequential flow
emit async :eventFire-and-forget, background tasks, fan-out

4.6 Broadcast Events and Queues

Broadcast events (emit :event to all @machine in #state) create one queue job per matching instance:

flow
emit :reminder to all @order in #pending

If 100 orders are in #pending, this creates 100 queue jobs:

Queue: eventflow.order
├── order-001 → :reminder
├── order-002 → :reminder
├── order-003 → :reminder
...
└── order-100 → :reminder

Each broadcast job is processed independently, respecting the target machine's concurrency settings.

Priority:

Broadcast events inherit the handler's priority:

flow
on :send_reminders from @scheduler (priority low)
  emit :reminder to all @order in #pending
  // All 100 :reminder events get priority: low

Async Broadcast:

By default, broadcast is async (fire-and-forget). Use explicit async for clarity:

flow
emit async :reminder to all @order in #pending

Note: Sync broadcast is not supported - you cannot wait for 100+ instances to process and aggregate their replies.


5. Event-Level Queue Flag

5.1 The queued Modifier

By default, API events are processed synchronously. Use the queued modifier to send an event directly to the queue:

flow
// Sync processing (default)
on :get_order from @customer (api)
  reply 200 with:
    | order | $order |

// Queued processing - returns 202 immediately, processes async
on :generate_report from @user (queued api)
  reply 202 async:
    callback: webhook
    url: $callback_url
  with:
    | job_id | $job_id |

The queued modifier:

  • Validates the event synchronously (see Data Validation Proposal)
  • Puts validated event on the queue
  • Returns response immediately (typically 202)
  • Processes event asynchronously from queue

4.2 Syntax

flow
on :event from @actor (queued api)

The modifiers appear in parentheses, space-separated:

  • api - Exposed as API endpoint
  • queued - Processed via queue (async)
  • queued api - API endpoint that queues for async processing

6. Queue Configuration Syntax

6.1 System-Level Queue (Defaults)

Configure at system level for default behavior:

flow
system: e-commerce

queue:
  strategy: fifo
  concurrency: 10
  timeout: 30 seconds
  retry:
    max_attempts: 3
    backoff: exponential

6.2 Machine-Level Queue (Override)

Override defaults for specific machines:

flow
machine: @order

queue:
  concurrency: 5
  timeout: 60 seconds

machine: @report_generator

queue:
  concurrency: 1          // Serial processing
  timeout: 10 minutes     // Long-running reports

6.3 Configuration Inheritance

Machine-level config merges with and overrides system-level defaults:

flow
system: e-commerce
queue:
  concurrency: 10        // System default
  timeout: 30 seconds
  retry:
    max_attempts: 3

machine: @order
queue:
  concurrency: 5         // Overrides system (5, not 10)
  // timeout: inherited  // Gets 30 seconds from system
  // retry: inherited    // Gets max_attempts: 3 from system

machine: @report
queue:
  concurrency: 1
  timeout: 10 minutes    // Overrides system
  // retry: inherited    // Gets max_attempts: 3 from system

Inheritance Rules:

  • Machine config merges with system config
  • Machine values override system values for the same key
  • Unspecified machine values inherit from system
  • Nested blocks (like retry:) merge recursively

6.4 Configuration Options

OptionTypeDefaultDescription
strategyfifo | priorityfifoProcessing order strategy
concurrencynumber10Max parallel event processing
timeoutduration30 secondsPer-event processing timeout
retry.max_attemptsnumber3Maximum retry attempts
retry.backofflinear | exponentialexponentialRetry delay strategy
retry.initial_delayduration1 secondFirst retry delay
retry.max_delayduration5 minutesMaximum retry delay
dead_letter.enabledbooleantruePreserve failed events
dead_letter.retentionduration7 daysHow long to keep failed events

6.4 Duration Syntax

flow
timeout: 30 seconds
timeout: 5 minutes
timeout: 1 hour
timeout: 500 milliseconds

7. Event Priority

7.1 Priority Levels

When using strategy: priority, events can be assigned priority levels using the (priority ...) modifier:

flow
on :checkout from @customer (api) (priority high)
  // Processed before normal/low priority events
  order moves to #processing

on :update_preferences from @customer (api) (priority low)
  // Processed when queue is less busy
  update user preferences

on :add_item from @customer (api)
  // priority: normal (default)
  $items adds $product

Multiple modifiers can be combined:

flow
on :generate_report from @admin (queued api) (priority low)
  // Queued + low priority
  reply 202 with:
    | job_id | $job_id |

7.2 Priority Levels

LevelValueUse Case
critical0System emergencies, circuit breakers
high1User-facing actions, payments
normal2Standard operations (default)
low3Background tasks, analytics
bulk4Batch processing, reports

7.3 Priority Queue Behavior

Queue (priority strategy):
┌─────────────────────────────────────────┐
│ critical: [circuit_break]               │ ← Processed first
│ high:     [checkout_1, checkout_2]      │
│ normal:   [add_item, update_cart, ...]  │
│ low:      [log_analytics, ...]          │
│ bulk:     [generate_report, ...]        │ ← Processed last
└─────────────────────────────────────────┘

8. Retry Mechanism

8.1 Automatic Retry

When event processing fails, the queue automatically retries:

flow
queue:
  retry:
    max_attempts: 3
    backoff: exponential
    initial_delay: 1 second
    max_delay: 1 minute

8.2 Retry Timeline (Exponential Backoff)

Attempt 1: Immediate
  ↓ (fails)
  Wait 1 second
Attempt 2:
  ↓ (fails)
  Wait 2 seconds
Attempt 3:
  ↓ (fails)
  Wait 4 seconds
Attempt 4 (if max_attempts > 3):
  ↓ (fails)
  → Dead Letter Queue

8.3 Retry Timeline (Linear Backoff)

Attempt 1: Immediate
  ↓ (fails)
  Wait 1 second
Attempt 2:
  ↓ (fails)
  Wait 1 second
Attempt 3:
  ↓ (fails)
  Wait 1 second
  → Dead Letter Queue

8.4 Non-Retryable Errors

Some errors should not be retried:

flow
on :checkout from @customer (api)
  ? cart is empty
    fail with "CART_EMPTY"    // Business logic error - no retry

  process payment
  // If payment gateway times out - will retry

Retryable: Timeouts, connection errors, rate limits Non-retryable: Validation errors, business logic failures, authentication errors


9. Dead Letter Queue

9.1 What is Dead Letter Queue?

When an event fails all retry attempts, it goes to the Dead Letter Queue (DLQ) instead of being lost.

The DLQ serves as:

  • Investigation tool: See why events failed
  • Recovery mechanism: Retry after fixing issues
  • Audit trail: Track all failed events

A DLQ entry contains everything needed to debug and retry the event.

9.2 Purpose

Events that fail all retry attempts go to the Dead Letter Queue (DLQ) for:

  • Investigation and debugging
  • Manual retry after fixing issues
  • Audit trail

9.3 Configuration

flow
queue:
  dead_letter:
    enabled: true
    retention: 7 days

9.4 DLQ Entry Structure

json
{
  "id": "dlq-abc123",
  "conversation_id": "conv-xyz789",
  "original_event": {
    "type": ":checkout",
    "from": "@customer",
    "aggregate_id": "order-xyz789",
    "data": { "cart_id": "cart-123" }
  },
  "machine": "@order",
  "attempts": 3,
  "first_attempt": "2024-12-15T10:00:00Z",
  "last_attempt": "2024-12-15T10:05:00Z",
  "errors": [
    { "attempt": 1, "error": "Connection timeout", "at": "..." },
    { "attempt": 2, "error": "Connection timeout", "at": "..." },
    { "attempt": 3, "error": "Connection refused", "at": "..." }
  ]
}

The conversation_id is critical for machine systems - it allows you to trace which conversation the failed event belonged to and investigate related events across all machines in that conversation.


10. CLI Commands

10.1 Queue Status

bash
eventflow queue status

Output:

Queue Status
────────────────────────────────────────────────────
System: e-commerce
  Strategy: fifo | Concurrency: 8/10 active

Machine Queues:
┌──────────────┬─────────┬────────────┬────────┬──────────┐
│ Machine      │ Pending │ Processing │ Failed │ Avg Time │
├──────────────┼─────────┼────────────┼────────┼──────────┤
│ @order       │ 15      │ 5          │ 1      │ 45ms     │
│ @payment     │ 2       │ 1          │ 0      │ 120ms    │
│ @inventory   │ 0       │ 1          │ 0      │ 8ms      │
│ @report      │ 8       │ 2          │ 0      │ 4.2s     │
└──────────────┴─────────┴────────────┴────────┴──────────┘

Dead Letter Queue: 3 events

10.2 Machine Queue Details

bash
eventflow queue status @order

Output:

@order Queue Details
────────────────────────────────────────────────────
Configuration:
  Strategy:      fifo
  Concurrency:   5
  Timeout:       30s
  Retry:         3 attempts, exponential backoff

Current State:
  Pending:       15 events
  Processing:    5 events
  Failed:        1 event
  Completed:     1,234 events (last hour)
  Throughput:    ~20 events/sec

Pending Events:
┌──────────────┬────────────┬──────────┬──────────────────┐
│ Aggregate    │ Event      │ Priority │ Queued At        │
├──────────────┼────────────┼──────────┼──────────────────┤
│ order-abc123 │ :checkout  │ high     │ 2 seconds ago    │
│ order-def456 │ :add_item  │ normal   │ 3 seconds ago    │
│ order-ghi789 │ :add_item  │ normal   │ 5 seconds ago    │
│ ...          │            │          │                  │
└──────────────┴────────────┴──────────┴──────────────────┘

10.3 Dead Letter Queue

bash
eventflow queue dlq

Output:

Dead Letter Queue
────────────────────────────────────────────────────
3 failed events

┌────────────┬──────────────┬────────────┬──────────┬─────────────────────┐
│ DLQ ID     │ Aggregate    │ Event      │ Attempts │ Last Error          │
├────────────┼──────────────┼────────────┼──────────┼─────────────────────┤
│ dlq-001    │ order-abc123 │ :checkout  │ 3        │ Connection refused  │
│ dlq-002    │ order-xyz789 │ :payment   │ 3        │ Gateway timeout     │
│ dlq-003    │ report-r1    │ :generate  │ 3        │ Out of memory       │
└────────────┴──────────────┴────────────┴──────────┴─────────────────────┘

Use 'eventflow queue dlq inspect <id>' for details
Use 'eventflow queue dlq retry <id>' to retry

10.4 Retry Commands

bash
# Retry single failed event
eventflow queue retry dlq-001

# Retry all failed events for a machine
eventflow queue retry @order --all

# Retry with delay
eventflow queue retry dlq-001 --delay 5s

10.5 Purge Commands

bash
# Purge pending events (dangerous!)
eventflow queue purge @order --pending --confirm

# Purge dead letter queue
eventflow queue purge @order --dlq --confirm

# Purge events older than duration
eventflow queue purge @order --older-than 24h --confirm

10.6 Pause/Resume

bash
# Pause processing (events still queue)
eventflow queue pause @order

# Resume processing
eventflow queue resume @order

# Check if paused
eventflow queue status @order
# Shows: Status: PAUSED (15 pending)

11. Monitoring & Observability

11.1 Metrics

The queue system should expose these metrics:

MetricTypeDescription
queue.pendingGaugeEvents waiting to be processed
queue.processingGaugeEvents currently being processed
queue.completedCounterTotal events successfully processed
queue.failedCounterTotal events that failed
queue.retriedCounterTotal retry attempts
queue.dlq_sizeGaugeEvents in dead letter queue
queue.processing_timeHistogramEvent processing duration
queue.wait_timeHistogramTime spent waiting in queue

11.2 Health Checks

bash
eventflow queue health

Output:

Queue Health Check
────────────────────────────────────────────────────
Overall: ⚠️  WARNING

Checks:
  ✅ @order queue healthy (15 pending, < 100 threshold)
  ✅ @payment queue healthy (2 pending, < 50 threshold)
  ⚠️  @report queue warning (45 pending, > 30 threshold)
  ✅ Dead letter queue healthy (3 events, < 10 threshold)
  ✅ No stuck events detected

Recommendations:
  - Consider increasing @report concurrency
  - Review 3 events in dead letter queue

11.3 Alerting Thresholds

flow
queue:
  alerts:
    pending_warning: 100
    pending_critical: 500
    dlq_warning: 10
    dlq_critical: 50
    processing_time_warning: 5 seconds
    processing_time_critical: 30 seconds

12. Complete Example

12.1 System with Queue Configuration

flow
system: e-commerce

queue:
  strategy: priority
  concurrency: 20
  timeout: 30 seconds
  retry:
    max_attempts: 3
    backoff: exponential
    initial_delay: 1 second
    max_delay: 1 minute
  dead_letter:
    enabled: true
    retention: 7 days
  alerts:
    pending_warning: 100
    pending_critical: 500

uses:
  @order from "./order.flow"
  @payment from "./payment.flow"
  @inventory from "./inventory.flow"
  @report from "./report.flow"

12.2 Order Machine

flow
machine: @order

queue:
  concurrency: 10
  timeout: 60 seconds

scenario: order processing

  on :checkout from @customer (api) (priority high)
    ? cart is valid
      $order_id becomes uuid()
      order moves to #awaiting_payment
      $payment becomes emit :payment_request to @payment

      ? $payment.success
        order moves to #paid
      otherwise
        order moves to #payment_failed

  on :add_item from @customer (api)
    // priority: normal (default)
    $items adds $product
    $total increases by $product.price

  on :payment_success from @payment (priority high)
    order moves to #paid
    emit async :reserve to @inventory

  on :generate_invoice from @system (priority low)
    generate pdf
    send to customer

12.3 Report Machine (Serial Processing)

flow
machine: @report

queue:
  concurrency: 1        // One report at a time
  timeout: 10 minutes   // Reports can be slow

scenario: generate reports

  on :generate_sales_report from @admin (queued api) (priority bulk)
    query all orders
    aggregate statistics
    create charts
    generate pdf
    report moves to #ready

13. Keywords Reference

KeywordContextDescription
emitactionSync emit - wait for result (default)
emit asyncactionAsync emit - queue without waiting
queuedevent modifierProcess event via queue (async)
(priority ...)event modifierEvent priority level
queue:system/machineQueue configuration block
strategy:queuefifo or priority
concurrency:queueMax parallel processing
timeout:queuePer-event timeout
retry:queueRetry configuration block
max_attempts:retryMaximum retry attempts
backoff:retrylinear or exponential
initial_delay:retryFirst retry delay
max_delay:retryMaximum retry delay
dead_letter:queueDLQ configuration block
enabled:dead_letterEnable/disable DLQ
retention:dead_letterHow long to keep failed events
alerts:queueAlert thresholds block

14. Implementation Notes

14.1 Queued Event Processing

When an event is marked with queued, the system splits processing into two phases:

┌─────────────────────────────────────────────────────────────────┐
│ HTTP Request                                                    │
│                                                                 │
│  1. Receive event                                               │
│  2. Run validation (sync) ──────────► 400 if invalid           │
│  3. Put event on queue                                          │
│  4. Return response (typically 202) ◄─── Request complete      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ Queue Worker (Background)                                       │
│                                                                 │
│  1. Pick event from queue                                       │
│  2. Load machine state                                          │
│  3. Run transitions (guards, actions, state changes)            │
│  4. Persist new state                                           │
│  5. Send callback if configured                                 │
│  6. Acknowledge event (remove from queue)                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Queued Transition Runner - A background worker that:

  • Polls the queue for pending events
  • Processes events in order (per-aggregate)
  • Handles retries on failure
  • Sends callbacks on completion
php
// Conceptual PHP implementation
class QueuedTransitionRunner
{
    public function process(QueuedEvent $event): void
    {
        $machine = $this->loadMachine($event->machine_id);
        $state = $this->loadState($event->aggregate_id);

        // Run the actual transitions
        $newState = $machine->transition($state, $event);

        $this->persistState($event->aggregate_id, $newState);

        // Send callback if configured
        if ($event->hasCallback()) {
            $this->sendCallback($event, $newState);
        }
    }
}

14.2 Storage Backend

Queues should be backed by persistent storage:

  • Development: SQLite or in-memory
  • Production: Redis, PostgreSQL, or dedicated message queue

14.3 Concurrency Control

Per-aggregate locking ensures events for the same aggregate don't process concurrently:

Lock: order-abc123
  → Process :checkout
  → Release lock
Lock: order-abc123
  → Process :add_item
  → ...

14.4 Idempotency

Event handlers should be idempotent for safe retries:

flow
on :checkout from @customer (api)
  // Check if already processed
  ? order already exists with $idempotency_key
    return existing order

  // Process normally
  $order_id becomes uuid()
  order moves to #awaiting_payment

14.5 PHP Binding

The PHP binding should integrate with:

  • Laravel Queue (Redis, Database, SQS)
  • Symfony Messenger
  • Custom queue adapters

15. Summary

┌─────────────────────────────────────────────────────────────┐
│                                                             │
│  Queue Configuration                                        │
│  ───────────────────                                        │
│  - System-level defaults                                    │
│  - Machine-level overrides                                  │
│  - Per-event priority                                       │
│                                                             │
│  Ordering Guarantees                                        │
│  ───────────────────                                        │
│  - Per-aggregate: Sequential                                │
│  - Cross-aggregate: Parallel                                │
│                                                             │
│  Failure Handling                                           │
│  ─────────────────                                          │
│  - Automatic retry with backoff                             │
│  - Dead letter queue for failed events                      │
│  - Manual retry via CLI                                     │
│                                                             │
│  Observability                                              │
│  ─────────────                                              │
│  - CLI: eventflow queue status                              │
│  - Metrics for monitoring systems                           │
│  - Health checks and alerts                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Philosophy

Events flow through queues. Queues guarantee order.

Machines process. Queues orchestrate.

The queue system follows EventFlow's natural language philosophy while providing enterprise-grade reliability and observability.


Released under the MIT License.