Skip to content

Event Queues

EventFlow provides built-in queue support for reliable, ordered event processing. Queues ensure events are processed in order per instance while enabling parallel processing across different instances.

Core Philosophy

Events flow through queues. Queues guarantee order.

Machines process. Queues orchestrate.

Machine vs Instance

Understanding the difference between machines and instances is crucial:

  • Machine (Template): A .flow file definition (e.g., machine: @order)
  • Instance (Aggregate): A logical entity identified by unique ID (e.g., @order:abc123)

Instances are reconstructed on demand - they don't live in memory continuously:

Request 1: :checkout for @order:abc123
  └─> Load state → Process → Persist → Done

Request 2: :payment_ok for @order:abc123 (minutes later)
  └─> Load state → Process → Persist → Done

Sync vs Async Emit

Sync Emit (Default)

By default, emit is synchronous - it waits for the target machine to process:

eventflow
on :checkout from @customer (api)
  $payment becomes emit :payment_request to @payment

  ? $payment.success
    order moves to #paid
    reply 201 with:
      | order_id | $order_id            |
      | txn_id   | $payment.transaction_id |
  otherwise
    reply 400 with:
      | error | $payment.error |

Sync emit bypasses the queue - it executes inline within the same request.

Async Emit

Use emit async to queue the event without waiting:

eventflow
on :checkout from @customer (api)
  order moves to #awaiting_payment

  emit async :payment_request to @payment
  emit async :analytics_event to @analytics

  reply 202 with:
      | order_id | $order_id  |
      | status   | "processing" |

Async emit puts the event on the queue and returns immediately.

When to Use Each

PatternUse Case
emit :event (sync)Need result for reply, sequential flow
emit async :eventFire-and-forget, background tasks, fan-out

The queued Modifier

Use queued to make an API event process asynchronously:

eventflow
// Sync processing (default)
on :get_order from @customer (api)
  reply 200 with:
    | order | $order |

// Queued processing - returns 202 immediately
on :generate_report from @user (queued api)
  // Processes asynchronously
  generate report
  send to user

The queued modifier:

  1. Validates the event synchronously
  2. Puts validated event on the queue
  3. Returns response immediately (typically 202)
  4. Processes event asynchronously

Queue Configuration

System-Level Defaults

eventflow
system: e-commerce

queue:
  strategy: fifo
  concurrency: 10
  timeout: 30 seconds
  retry:
    max_attempts: 3
    backoff: exponential

Machine-Level Override

eventflow
machine: @order

queue:
  concurrency: 5
  timeout: 60 seconds

machine: @report

queue:
  concurrency: 1          // Serial processing
  timeout: 10 minutes     // Long-running reports

Machine config merges with and overrides system defaults.

Configuration Options

OptionTypeDefaultDescription
strategyfifo | priorityfifoProcessing order
concurrencynumber10Max parallel processing
timeoutduration30 secondsPer-event timeout
retry.max_attemptsnumber3Maximum retries
retry.backofflinear | exponentialexponentialRetry delay
retry.initial_delayduration1 secondFirst retry delay
dead_letter.enabledbooleantruePreserve failed events
dead_letter.retentionduration7 daysDLQ retention

What Concurrency Means

concurrency: 10 means 10 different instances can process events simultaneously.

Within each instance, events are always processed sequentially:

concurrency: 3

@order:abc │ :checkout │ :payment_ok │
@order:xyz │ :add_item │ :checkout   │  ← parallel with abc
@order:def │ :checkout │             │  ← parallel with abc, xyz
@order:ghi │ (waiting) │             │  ← waits for a slot

Event Priority

When using strategy: priority, assign priority levels using the (priority ...) modifier:

eventflow
on :checkout from @customer (api) (priority high)
  order moves to #processing

on :update_preferences from @customer (api) (priority low)
  update user preferences

on :add_item from @customer (api)
  // priority: normal (default)
  $items adds $product

Priority Levels

LevelUse Case
criticalSystem emergencies, circuit breakers
highUser-facing actions, payments
normalStandard operations (default)
lowBackground tasks, analytics
bulkBatch processing, reports

Retry Mechanism

Failed events are automatically retried with backoff:

eventflow
queue:
  retry:
    max_attempts: 3
    backoff: exponential
    initial_delay: 1 second
    max_delay: 1 minute

Exponential Backoff Timeline

Attempt 1: Immediate
  ↓ (fails)
  Wait 1 second
Attempt 2:
  ↓ (fails)
  Wait 2 seconds
Attempt 3:
  ↓ (fails)
  → Dead Letter Queue

Non-Retryable Errors

Some errors should not be retried:

  • Retryable: Timeouts, connection errors, rate limits
  • Non-retryable: Validation errors, business logic failures

Dead Letter Queue

When an event fails all retry attempts, it goes to the Dead Letter Queue (DLQ):

eventflow
queue:
  dead_letter:
    enabled: true
    retention: 7 days

DLQ entries preserve:

  • Original event data
  • All error messages
  • Attempt timestamps
  • Conversation context

Use CLI commands to inspect and retry DLQ events. See Queue Commands.

Broadcast Events

Broadcast events create one queue job per matching instance:

eventflow
emit :reminder to all @order in #pending

If 100 orders are in #pending, this creates 100 queue jobs processed independently.

Note: Sync broadcast is not supported - you cannot wait for 100+ instances.

Complete Example

eventflow
system: e-commerce

queue:
  strategy: priority
  concurrency: 20
  timeout: 30 seconds
  retry:
    max_attempts: 3
    backoff: exponential
  dead_letter:
    enabled: true
    retention: 7 days

uses:
  @order from "./order.flow"
  @payment from "./payment.flow"
eventflow
machine: @order

queue:
  concurrency: 10
  timeout: 60 seconds

scenario: order processing

  on :checkout from @customer (api) (priority high)
    ? cart is valid
      $order_id becomes uuid()
      order moves to #awaiting_payment
      $payment becomes emit :payment_request to @payment

      ? $payment.success
        order moves to #paid
      otherwise
        order moves to #payment_failed

  on :add_item from @customer (api)
    $items adds $product
    $total increases by $product.price

  on :payment_success from @payment (priority high)
    order moves to #paid
    emit async :reserve to @inventory

Released under the MIT License.