Event Queues
EventFlow provides built-in queue support for reliable, ordered event processing. Queues ensure events are processed in order per instance while enabling parallel processing across different instances.
Core Philosophy
Events flow through queues. Queues guarantee order.
Machines process. Queues orchestrate.
Machine vs Instance
Understanding the difference between machines and instances is crucial:
- Machine (Template): A
.flowfile definition (e.g.,machine: @order) - Instance (Aggregate): A logical entity identified by unique ID (e.g.,
@order:abc123)
Instances are reconstructed on demand - they don't live in memory continuously:
Request 1: :checkout for @order:abc123
└─> Load state → Process → Persist → Done
Request 2: :payment_ok for @order:abc123 (minutes later)
└─> Load state → Process → Persist → DoneSync vs Async Emit
Sync Emit (Default)
By default, emit is synchronous - it waits for the target machine to process:
on :checkout from @customer (api)
$payment becomes emit :payment_request to @payment
? $payment.success
order moves to #paid
reply 201 with:
| order_id | $order_id |
| txn_id | $payment.transaction_id |
otherwise
reply 400 with:
| error | $payment.error |Sync emit bypasses the queue - it executes inline within the same request.
Async Emit
Use emit async to queue the event without waiting:
on :checkout from @customer (api)
order moves to #awaiting_payment
emit async :payment_request to @payment
emit async :analytics_event to @analytics
reply 202 with:
| order_id | $order_id |
| status | "processing" |Async emit puts the event on the queue and returns immediately.
When to Use Each
| Pattern | Use Case |
|---|---|
emit :event (sync) | Need result for reply, sequential flow |
emit async :event | Fire-and-forget, background tasks, fan-out |
The queued Modifier
Use queued to make an API event process asynchronously:
// Sync processing (default)
on :get_order from @customer (api)
reply 200 with:
| order | $order |
// Queued processing - returns 202 immediately
on :generate_report from @user (queued api)
// Processes asynchronously
generate report
send to userThe queued modifier:
- Validates the event synchronously
- Puts validated event on the queue
- Returns response immediately (typically 202)
- Processes event asynchronously
Queue Configuration
System-Level Defaults
system: e-commerce
queue:
strategy: fifo
concurrency: 10
timeout: 30 seconds
retry:
max_attempts: 3
backoff: exponentialMachine-Level Override
machine: @order
queue:
concurrency: 5
timeout: 60 seconds
machine: @report
queue:
concurrency: 1 // Serial processing
timeout: 10 minutes // Long-running reportsMachine config merges with and overrides system defaults.
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
strategy | fifo | priority | fifo | Processing order |
concurrency | number | 10 | Max parallel processing |
timeout | duration | 30 seconds | Per-event timeout |
retry.max_attempts | number | 3 | Maximum retries |
retry.backoff | linear | exponential | exponential | Retry delay |
retry.initial_delay | duration | 1 second | First retry delay |
dead_letter.enabled | boolean | true | Preserve failed events |
dead_letter.retention | duration | 7 days | DLQ retention |
What Concurrency Means
concurrency: 10 means 10 different instances can process events simultaneously.
Within each instance, events are always processed sequentially:
concurrency: 3
@order:abc │ :checkout │ :payment_ok │
@order:xyz │ :add_item │ :checkout │ ← parallel with abc
@order:def │ :checkout │ │ ← parallel with abc, xyz
@order:ghi │ (waiting) │ │ ← waits for a slotEvent Priority
When using strategy: priority, assign priority levels using the (priority ...) modifier:
on :checkout from @customer (api) (priority high)
order moves to #processing
on :update_preferences from @customer (api) (priority low)
update user preferences
on :add_item from @customer (api)
// priority: normal (default)
$items adds $productPriority Levels
| Level | Use Case |
|---|---|
critical | System emergencies, circuit breakers |
high | User-facing actions, payments |
normal | Standard operations (default) |
low | Background tasks, analytics |
bulk | Batch processing, reports |
Retry Mechanism
Failed events are automatically retried with backoff:
queue:
retry:
max_attempts: 3
backoff: exponential
initial_delay: 1 second
max_delay: 1 minuteExponential Backoff Timeline
Attempt 1: Immediate
↓ (fails)
Wait 1 second
Attempt 2:
↓ (fails)
Wait 2 seconds
Attempt 3:
↓ (fails)
→ Dead Letter QueueNon-Retryable Errors
Some errors should not be retried:
- Retryable: Timeouts, connection errors, rate limits
- Non-retryable: Validation errors, business logic failures
Dead Letter Queue
When an event fails all retry attempts, it goes to the Dead Letter Queue (DLQ):
queue:
dead_letter:
enabled: true
retention: 7 daysDLQ entries preserve:
- Original event data
- All error messages
- Attempt timestamps
- Conversation context
Use CLI commands to inspect and retry DLQ events. See Queue Commands.
Broadcast Events
Broadcast events create one queue job per matching instance:
emit :reminder to all @order in #pendingIf 100 orders are in #pending, this creates 100 queue jobs processed independently.
Note: Sync broadcast is not supported - you cannot wait for 100+ instances.
Complete Example
system: e-commerce
queue:
strategy: priority
concurrency: 20
timeout: 30 seconds
retry:
max_attempts: 3
backoff: exponential
dead_letter:
enabled: true
retention: 7 days
uses:
@order from "./order.flow"
@payment from "./payment.flow"machine: @order
queue:
concurrency: 10
timeout: 60 seconds
scenario: order processing
on :checkout from @customer (api) (priority high)
? cart is valid
$order_id becomes uuid()
order moves to #awaiting_payment
$payment becomes emit :payment_request to @payment
? $payment.success
order moves to #paid
otherwise
order moves to #payment_failed
on :add_item from @customer (api)
$items adds $product
$total increases by $product.price
on :payment_success from @payment (priority high)
order moves to #paid
emit async :reserve to @inventoryRelated
- Events Overview - Basic event handling
- Machine Responses - API responses
- Queue Commands - CLI reference
- Keywords - Events - Queue keywords