EventFlow Event Queue Proposal
Reliable, Ordered Event Processing for State Machines
Version: 1.0 Date: December 2024 Status: ✅ Implemented
This proposal has been implemented and integrated into the main documentation.
See the following documentation pages:
- Event Queues Guide - Complete guide
- Keywords Reference - Events - Queue keywords
- CLI Reference - Queue Commands - CLI commands
This proposal is kept as an archive for reference.
1. Executive Summary
EventFlow machines process events to transition between states. As systems scale, reliable event processing becomes critical. This proposal introduces Event Queues - a natural language approach to configuring ordered, reliable, and observable event processing.
Core Philosophy
Events flow through queues. Queues guarantee order.
Machines process. Queues orchestrate.
Key Features
- Per-aggregate ordering - Events for the same instance processed in order
- System & machine queues - Hierarchical configuration
- Priority support - Critical events processed first
- Retry mechanism - Automatic retry with backoff
- Dead letter queue - Failed events preserved for analysis
- CLI monitoring -
eventflow queuecommands for visibility
2. Motivation & Problem Statement
2.1 Current Situation
EventFlow machines receive and process events, but there's no explicit model for:
- How events are queued before processing
- What happens when processing fails
- How to prioritize critical events
- How to monitor queue health
2.2 Real-World Challenges
Problem 1: Race Conditions
─────────────────────────
Customer sends :add_item and :checkout nearly simultaneously.
Without ordering, :checkout might process before :add_item.
Problem 2: Transient Failures
─────────────────────────────
Payment gateway times out. Without retry, order is stuck.
Problem 3: Visibility
─────────────────────
100 orders are processing slowly. How do we know what's pending?2.3 Goals
- Guarantee per-aggregate ordering - Same order instance, sequential processing
- Enable parallel processing - Different aggregates can process concurrently
- Handle failures gracefully - Retry with backoff, preserve failed events
- Provide observability - CLI tools for queue monitoring
3. Queue Architecture
3.1 Machine vs Instance
Before understanding queue architecture, it's important to distinguish:
- Machine (Template): A
.flowfile definition (e.g.,machine: @order) - Instance (Aggregate): A logical entity identified by unique ID (e.g.,
@order:abc123)
Instance Lifecycle (PHP/Laravel):
Instances don't live in memory continuously. They are reconstructed on demand:
- Event arrives (HTTP request or queue job)
- Load instance state from database (event store or snapshot)
- Process event (run guards, actions, transitions)
- Persist new state to database
- Instance discarded from memory
Request 1: :checkout for @order:abc123
└─> Load state → Process → Persist → Done
Request 2: :payment_ok for @order:abc123 (minutes later)
└─> Load state → Process → Persist → DoneMultiple requests can process different instances in parallel:
Request A: @order:abc123 → processing
Request B: @order:xyz789 → processing (parallel)
Request C: @order:def456 → processing (parallel)But the same instance is processed sequentially (via locking).
Each instance has its own state, context, and event history. Instances never share state - they only communicate through events.
3.2 What Concurrency Means
concurrency: 10 means 10 different instances can process events simultaneously.
Within each instance: Events are always processed sequentially (one at a time).
concurrency: 3
@order:abc │ :checkout │ :payment_ok │
@order:xyz │ :add_item │ :checkout │ ← parallel with abc
@order:def │ :checkout │ │ ← parallel with abc, xyz
@order:ghi │ (waiting) │ │ ← waits for a slotThis is cross-instance parallelism, not parallel event processing within an instance.
Laravel Implementation
EventFlow automatically configures Laravel Horizon based on your .flow files:
// order.flow
machine: @order
queue:
concurrency: 10
timeout: 60 seconds// payment.flow
machine: @payment
queue:
concurrency: 3When you run eventflow serve or eventflow horizon:config, EventFlow:
- Scans all
.flowfiles forqueue:blocks - Generates Horizon supervisor configuration
- Creates per-machine queues (
eventflow.order,eventflow.payment)
Generated Config (internal):
// Auto-generated by EventFlow - do not edit manually
'supervisors' => [
'eventflow-order' => [
'queue' => ['eventflow.order'],
'processes' => 10, // from queue.concurrency
],
'eventflow-payment' => [
'queue' => ['eventflow.payment'],
'processes' => 3,
],
],You write .flow files, EventFlow handles the infrastructure.
How it works at runtime:
- Async events (
emit async) are dispatched to Laravel queues - Horizon workers process queue jobs (one per process)
- Per-aggregate locking ensures same instance is sequential
Queue: eventflow.order
├── Worker 1 → @order:abc123 (locked)
├── Worker 2 → @order:xyz789
├── Worker 3 → @order:def456
└── Worker 4 → waiting for jobs...
@order:abc123 events:
:checkout (processing by Worker 1)
:add_item (waiting - same aggregate locked)Sync events (emit default) don't go through queue - they execute inline within the same request/process.
3.3 Conceptual Model
┌─────────────────────────────────────┐
│ System Queue │
│ (e-commerce, concurrency: 10) │
└─────────────────────────────────────┘
│
┌────────────────────────┼────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐
│ @order Queue │ │ @payment Queue │ │ @inventory Queue │
│ (concurrency: 5) │ │ (concurrency: 3) │ │ (concurrency: 1) │
└─────────────────────┘ └─────────────────────┘ └─────────────────────┘
│ │ │
┌──────┴──────┐ ┌──────┴──────┐ │
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
order-123 order-456 payment-a payment-b inventory
(sequential) (parallel) (parallel) (parallel) (serial)Note:
@order Queue (concurrency: 5)means up to 5 different order instances can process events in parallel. Each instance still processes its own events sequentially.
3.4 Per-Aggregate Ordering
Within a machine, events for the same aggregate are always processed sequentially:
@order:abc123 events:
1. :add_item → processes first
2. :add_item → waits for #1
3. :checkout → waits for #2
4. :payment_ok → waits for #3
@order:xyz789 events (parallel with abc123):
1. :checkout → processes immediately
2. :payment_ok → waits for #1This guarantees consistency: an order can't be checked out before items are added.
3.5 Cross-Aggregate Parallelism
Different aggregates can process in parallel:
Time →
───────────────────────────────────────────────────
@order:abc │ :add_item │ :checkout │ :payment_ok │
───────────────────────────────────────────────────
@order:xyz │ :checkout │ :ship │ │
───────────────────────────────────────────────────
@order:def │ :add_item │ :add_item │ :checkout │
───────────────────────────────────────────────────3.6 Conversation Context in Machine Systems
In a machine system, multiple machine instances work together within a conversation:
Conversation: conv-abc123
├── @order:order-001 → #awaiting_payment
├── @payment:pay-001 → #processing
└── @inventory:inv-001 → #reservedEvent Routing:
When @order emits to @payment:
// In @order machine
emit :payment_request to @paymentEventFlow:
- Checks if a
@paymentinstance exists in this conversation - If yes → routes to existing instance
- If no → creates new
@paymentinstance in same conversation
Queue Ordering:
Events are ordered by conversation + machine + instance:
Queue: eventflow.payment
├── conv-abc:pay-001 → [:payment_request, :cancel_payment] // sequential
├── conv-xyz:pay-002 → [:payment_request] // parallel
└── conv-def:pay-003 → [:refund_request] // parallelSame conversation, same machine instance = sequential. Different conversations = parallel.
4. Sync and Async Emit
4.1 Default Behavior: Sync
By default, emit is synchronous - it waits for the target machine to process and returns the result:
on :checkout from @customer (api)
$payment becomes emit :payment_request to @payment
? $payment.success
order moves to #paid
reply 201 with:
| order_id | $order_id |
| txn_id | $payment.transaction_id |
otherwise
reply 400 with:
| error | $payment.error |The sync emit:
- Sends event to target machine
- Waits for target to process
- Returns the target's reply as a result
- Allows using result in guards and responses
Note: Sync emit bypasses the queue entirely - it executes inline within the same request/process. The target machine's
queue:configuration only applies to async events (emit async).
4.2 Async Emit
Use emit async to queue the event without waiting:
on :checkout from @customer (api)
order moves to #awaiting_payment
emit async :payment_request to @payment
emit async :analytics_event to @analytics
reply 202 with:
| order_id | $order_id |
| status | "processing" |The async emit:
- Puts event on the queue
- Does not wait for processing
- Returns immediately
- Target processes asynchronously
4.3 Capturing Sync Results
The sync emit returns the target handler's reply:
// Payment machine
on :payment_request from @order
charge card
reply 200 with:
| success | true |
| transaction_id | $txn_id |
// Order machine - captures the reply
$result becomes emit :payment_request to @payment
// $result.success = true
// $result.transaction_id = "txn-123"4.4 Recursive Chains
Sync emit chains are allowed - each reply bubbles up to its caller:
// Order machine (API entry point)
on :checkout from @customer (api)
$payment becomes emit :payment_request to @payment
reply 201 with: // ← THIS goes to API caller
| order_id | $order_id |
| txn_id | $payment.txn_id |
| fraud_score | $payment.fraud_score |
// Payment machine
on :payment_request from @order
$fraud becomes emit :check_fraud to @fraud
reply 200 with: // ← Returns to Order (not API)
| txn_id | $txn_id |
| fraud_score | $fraud.score |
// Fraud machine
on :check_fraud from @payment
reply 200 with: // ← Returns to Payment (not API)
| score | 0.1 |
| status | "clean" |Execution Timeline:
API Request: POST /checkout
│
▼
┌─ Order Handler ─────────────────────────────────────────┐
│ 1. $payment becomes emit :payment_request to @payment │
│ │ │
│ ▼ │
│ ┌─ Payment Handler ─────────────────────────────┐ │
│ │ 2. $fraud becomes emit :check_fraud to @fraud │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌─ Fraud Handler ─────────────────────┐ │ │
│ │ │ 3. reply 200 { score: 0.1 } │ │ │
│ │ └────────────────────────────────────-┘ │ │
│ │ │ │ │
│ │ ▼ ($fraud = { score: 0.1 }) │ │
│ │ 4. reply 200 { txn_id, fraud_score } │ │
│ └────────────────────────────────────────────────┘ │
│ │ │
│ ▼ ($payment = { txn_id, fraud_score }) │
│ 5. reply 201 { order_id, txn_id, fraud_score } │
└─────────────────────────────────────────────────────────┘
│
▼
API Response: 201 { order_id, txn_id, fraud_score }Key Points:
- Only the outermost handler's reply goes to the API caller
- Inner replies become return values captured by
$var becomes emit - Each handler controls what data to expose to its caller
4.5 When to Use Each
| Pattern | Use Case |
|---|---|
emit :event (sync) | Need result for reply, sequential flow |
emit async :event | Fire-and-forget, background tasks, fan-out |
4.6 Broadcast Events and Queues
Broadcast events (emit :event to all @machine in #state) create one queue job per matching instance:
emit :reminder to all @order in #pendingIf 100 orders are in #pending, this creates 100 queue jobs:
Queue: eventflow.order
├── order-001 → :reminder
├── order-002 → :reminder
├── order-003 → :reminder
...
└── order-100 → :reminderEach broadcast job is processed independently, respecting the target machine's concurrency settings.
Priority:
Broadcast events inherit the handler's priority:
on :send_reminders from @scheduler (priority low)
emit :reminder to all @order in #pending
// All 100 :reminder events get priority: lowAsync Broadcast:
By default, broadcast is async (fire-and-forget). Use explicit async for clarity:
emit async :reminder to all @order in #pendingNote: Sync broadcast is not supported - you cannot wait for 100+ instances to process and aggregate their replies.
5. Event-Level Queue Flag
5.1 The queued Modifier
By default, API events are processed synchronously. Use the queued modifier to send an event directly to the queue:
// Sync processing (default)
on :get_order from @customer (api)
reply 200 with:
| order | $order |
// Queued processing - returns 202 immediately, processes async
on :generate_report from @user (queued api)
reply 202 async:
callback: webhook
url: $callback_url
with:
| job_id | $job_id |The queued modifier:
- Validates the event synchronously (see Data Validation Proposal)
- Puts validated event on the queue
- Returns response immediately (typically 202)
- Processes event asynchronously from queue
4.2 Syntax
on :event from @actor (queued api)The modifiers appear in parentheses, space-separated:
api- Exposed as API endpointqueued- Processed via queue (async)queued api- API endpoint that queues for async processing
6. Queue Configuration Syntax
6.1 System-Level Queue (Defaults)
Configure at system level for default behavior:
system: e-commerce
queue:
strategy: fifo
concurrency: 10
timeout: 30 seconds
retry:
max_attempts: 3
backoff: exponential6.2 Machine-Level Queue (Override)
Override defaults for specific machines:
machine: @order
queue:
concurrency: 5
timeout: 60 seconds
machine: @report_generator
queue:
concurrency: 1 // Serial processing
timeout: 10 minutes // Long-running reports6.3 Configuration Inheritance
Machine-level config merges with and overrides system-level defaults:
system: e-commerce
queue:
concurrency: 10 // System default
timeout: 30 seconds
retry:
max_attempts: 3
machine: @order
queue:
concurrency: 5 // Overrides system (5, not 10)
// timeout: inherited // Gets 30 seconds from system
// retry: inherited // Gets max_attempts: 3 from system
machine: @report
queue:
concurrency: 1
timeout: 10 minutes // Overrides system
// retry: inherited // Gets max_attempts: 3 from systemInheritance Rules:
- Machine config merges with system config
- Machine values override system values for the same key
- Unspecified machine values inherit from system
- Nested blocks (like
retry:) merge recursively
6.4 Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
strategy | fifo | priority | fifo | Processing order strategy |
concurrency | number | 10 | Max parallel event processing |
timeout | duration | 30 seconds | Per-event processing timeout |
retry.max_attempts | number | 3 | Maximum retry attempts |
retry.backoff | linear | exponential | exponential | Retry delay strategy |
retry.initial_delay | duration | 1 second | First retry delay |
retry.max_delay | duration | 5 minutes | Maximum retry delay |
dead_letter.enabled | boolean | true | Preserve failed events |
dead_letter.retention | duration | 7 days | How long to keep failed events |
6.4 Duration Syntax
timeout: 30 seconds
timeout: 5 minutes
timeout: 1 hour
timeout: 500 milliseconds7. Event Priority
7.1 Priority Levels
When using strategy: priority, events can be assigned priority levels using the (priority ...) modifier:
on :checkout from @customer (api) (priority high)
// Processed before normal/low priority events
order moves to #processing
on :update_preferences from @customer (api) (priority low)
// Processed when queue is less busy
update user preferences
on :add_item from @customer (api)
// priority: normal (default)
$items adds $productMultiple modifiers can be combined:
on :generate_report from @admin (queued api) (priority low)
// Queued + low priority
reply 202 with:
| job_id | $job_id |7.2 Priority Levels
| Level | Value | Use Case |
|---|---|---|
critical | 0 | System emergencies, circuit breakers |
high | 1 | User-facing actions, payments |
normal | 2 | Standard operations (default) |
low | 3 | Background tasks, analytics |
bulk | 4 | Batch processing, reports |
7.3 Priority Queue Behavior
Queue (priority strategy):
┌─────────────────────────────────────────┐
│ critical: [circuit_break] │ ← Processed first
│ high: [checkout_1, checkout_2] │
│ normal: [add_item, update_cart, ...] │
│ low: [log_analytics, ...] │
│ bulk: [generate_report, ...] │ ← Processed last
└─────────────────────────────────────────┘8. Retry Mechanism
8.1 Automatic Retry
When event processing fails, the queue automatically retries:
queue:
retry:
max_attempts: 3
backoff: exponential
initial_delay: 1 second
max_delay: 1 minute8.2 Retry Timeline (Exponential Backoff)
Attempt 1: Immediate
↓ (fails)
Wait 1 second
Attempt 2:
↓ (fails)
Wait 2 seconds
Attempt 3:
↓ (fails)
Wait 4 seconds
Attempt 4 (if max_attempts > 3):
↓ (fails)
→ Dead Letter Queue8.3 Retry Timeline (Linear Backoff)
Attempt 1: Immediate
↓ (fails)
Wait 1 second
Attempt 2:
↓ (fails)
Wait 1 second
Attempt 3:
↓ (fails)
Wait 1 second
→ Dead Letter Queue8.4 Non-Retryable Errors
Some errors should not be retried:
on :checkout from @customer (api)
? cart is empty
fail with "CART_EMPTY" // Business logic error - no retry
process payment
// If payment gateway times out - will retryRetryable: Timeouts, connection errors, rate limits Non-retryable: Validation errors, business logic failures, authentication errors
9. Dead Letter Queue
9.1 What is Dead Letter Queue?
When an event fails all retry attempts, it goes to the Dead Letter Queue (DLQ) instead of being lost.
The DLQ serves as:
- Investigation tool: See why events failed
- Recovery mechanism: Retry after fixing issues
- Audit trail: Track all failed events
A DLQ entry contains everything needed to debug and retry the event.
9.2 Purpose
Events that fail all retry attempts go to the Dead Letter Queue (DLQ) for:
- Investigation and debugging
- Manual retry after fixing issues
- Audit trail
9.3 Configuration
queue:
dead_letter:
enabled: true
retention: 7 days9.4 DLQ Entry Structure
{
"id": "dlq-abc123",
"conversation_id": "conv-xyz789",
"original_event": {
"type": ":checkout",
"from": "@customer",
"aggregate_id": "order-xyz789",
"data": { "cart_id": "cart-123" }
},
"machine": "@order",
"attempts": 3,
"first_attempt": "2024-12-15T10:00:00Z",
"last_attempt": "2024-12-15T10:05:00Z",
"errors": [
{ "attempt": 1, "error": "Connection timeout", "at": "..." },
{ "attempt": 2, "error": "Connection timeout", "at": "..." },
{ "attempt": 3, "error": "Connection refused", "at": "..." }
]
}The conversation_id is critical for machine systems - it allows you to trace which conversation the failed event belonged to and investigate related events across all machines in that conversation.
10. CLI Commands
10.1 Queue Status
eventflow queue statusOutput:
Queue Status
────────────────────────────────────────────────────
System: e-commerce
Strategy: fifo | Concurrency: 8/10 active
Machine Queues:
┌──────────────┬─────────┬────────────┬────────┬──────────┐
│ Machine │ Pending │ Processing │ Failed │ Avg Time │
├──────────────┼─────────┼────────────┼────────┼──────────┤
│ @order │ 15 │ 5 │ 1 │ 45ms │
│ @payment │ 2 │ 1 │ 0 │ 120ms │
│ @inventory │ 0 │ 1 │ 0 │ 8ms │
│ @report │ 8 │ 2 │ 0 │ 4.2s │
└──────────────┴─────────┴────────────┴────────┴──────────┘
Dead Letter Queue: 3 events10.2 Machine Queue Details
eventflow queue status @orderOutput:
@order Queue Details
────────────────────────────────────────────────────
Configuration:
Strategy: fifo
Concurrency: 5
Timeout: 30s
Retry: 3 attempts, exponential backoff
Current State:
Pending: 15 events
Processing: 5 events
Failed: 1 event
Completed: 1,234 events (last hour)
Throughput: ~20 events/sec
Pending Events:
┌──────────────┬────────────┬──────────┬──────────────────┐
│ Aggregate │ Event │ Priority │ Queued At │
├──────────────┼────────────┼──────────┼──────────────────┤
│ order-abc123 │ :checkout │ high │ 2 seconds ago │
│ order-def456 │ :add_item │ normal │ 3 seconds ago │
│ order-ghi789 │ :add_item │ normal │ 5 seconds ago │
│ ... │ │ │ │
└──────────────┴────────────┴──────────┴──────────────────┘10.3 Dead Letter Queue
eventflow queue dlqOutput:
Dead Letter Queue
────────────────────────────────────────────────────
3 failed events
┌────────────┬──────────────┬────────────┬──────────┬─────────────────────┐
│ DLQ ID │ Aggregate │ Event │ Attempts │ Last Error │
├────────────┼──────────────┼────────────┼──────────┼─────────────────────┤
│ dlq-001 │ order-abc123 │ :checkout │ 3 │ Connection refused │
│ dlq-002 │ order-xyz789 │ :payment │ 3 │ Gateway timeout │
│ dlq-003 │ report-r1 │ :generate │ 3 │ Out of memory │
└────────────┴──────────────┴────────────┴──────────┴─────────────────────┘
Use 'eventflow queue dlq inspect <id>' for details
Use 'eventflow queue dlq retry <id>' to retry10.4 Retry Commands
# Retry single failed event
eventflow queue retry dlq-001
# Retry all failed events for a machine
eventflow queue retry @order --all
# Retry with delay
eventflow queue retry dlq-001 --delay 5s10.5 Purge Commands
# Purge pending events (dangerous!)
eventflow queue purge @order --pending --confirm
# Purge dead letter queue
eventflow queue purge @order --dlq --confirm
# Purge events older than duration
eventflow queue purge @order --older-than 24h --confirm10.6 Pause/Resume
# Pause processing (events still queue)
eventflow queue pause @order
# Resume processing
eventflow queue resume @order
# Check if paused
eventflow queue status @order
# Shows: Status: PAUSED (15 pending)11. Monitoring & Observability
11.1 Metrics
The queue system should expose these metrics:
| Metric | Type | Description |
|---|---|---|
queue.pending | Gauge | Events waiting to be processed |
queue.processing | Gauge | Events currently being processed |
queue.completed | Counter | Total events successfully processed |
queue.failed | Counter | Total events that failed |
queue.retried | Counter | Total retry attempts |
queue.dlq_size | Gauge | Events in dead letter queue |
queue.processing_time | Histogram | Event processing duration |
queue.wait_time | Histogram | Time spent waiting in queue |
11.2 Health Checks
eventflow queue healthOutput:
Queue Health Check
────────────────────────────────────────────────────
Overall: ⚠️ WARNING
Checks:
✅ @order queue healthy (15 pending, < 100 threshold)
✅ @payment queue healthy (2 pending, < 50 threshold)
⚠️ @report queue warning (45 pending, > 30 threshold)
✅ Dead letter queue healthy (3 events, < 10 threshold)
✅ No stuck events detected
Recommendations:
- Consider increasing @report concurrency
- Review 3 events in dead letter queue11.3 Alerting Thresholds
queue:
alerts:
pending_warning: 100
pending_critical: 500
dlq_warning: 10
dlq_critical: 50
processing_time_warning: 5 seconds
processing_time_critical: 30 seconds12. Complete Example
12.1 System with Queue Configuration
system: e-commerce
queue:
strategy: priority
concurrency: 20
timeout: 30 seconds
retry:
max_attempts: 3
backoff: exponential
initial_delay: 1 second
max_delay: 1 minute
dead_letter:
enabled: true
retention: 7 days
alerts:
pending_warning: 100
pending_critical: 500
uses:
@order from "./order.flow"
@payment from "./payment.flow"
@inventory from "./inventory.flow"
@report from "./report.flow"12.2 Order Machine
machine: @order
queue:
concurrency: 10
timeout: 60 seconds
scenario: order processing
on :checkout from @customer (api) (priority high)
? cart is valid
$order_id becomes uuid()
order moves to #awaiting_payment
$payment becomes emit :payment_request to @payment
? $payment.success
order moves to #paid
otherwise
order moves to #payment_failed
on :add_item from @customer (api)
// priority: normal (default)
$items adds $product
$total increases by $product.price
on :payment_success from @payment (priority high)
order moves to #paid
emit async :reserve to @inventory
on :generate_invoice from @system (priority low)
generate pdf
send to customer12.3 Report Machine (Serial Processing)
machine: @report
queue:
concurrency: 1 // One report at a time
timeout: 10 minutes // Reports can be slow
scenario: generate reports
on :generate_sales_report from @admin (queued api) (priority bulk)
query all orders
aggregate statistics
create charts
generate pdf
report moves to #ready13. Keywords Reference
| Keyword | Context | Description |
|---|---|---|
emit | action | Sync emit - wait for result (default) |
emit async | action | Async emit - queue without waiting |
queued | event modifier | Process event via queue (async) |
(priority ...) | event modifier | Event priority level |
queue: | system/machine | Queue configuration block |
strategy: | queue | fifo or priority |
concurrency: | queue | Max parallel processing |
timeout: | queue | Per-event timeout |
retry: | queue | Retry configuration block |
max_attempts: | retry | Maximum retry attempts |
backoff: | retry | linear or exponential |
initial_delay: | retry | First retry delay |
max_delay: | retry | Maximum retry delay |
dead_letter: | queue | DLQ configuration block |
enabled: | dead_letter | Enable/disable DLQ |
retention: | dead_letter | How long to keep failed events |
alerts: | queue | Alert thresholds block |
14. Implementation Notes
14.1 Queued Event Processing
When an event is marked with queued, the system splits processing into two phases:
┌─────────────────────────────────────────────────────────────────┐
│ HTTP Request │
│ │
│ 1. Receive event │
│ 2. Run validation (sync) ──────────► 400 if invalid │
│ 3. Put event on queue │
│ 4. Return response (typically 202) ◄─── Request complete │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Queue Worker (Background) │
│ │
│ 1. Pick event from queue │
│ 2. Load machine state │
│ 3. Run transitions (guards, actions, state changes) │
│ 4. Persist new state │
│ 5. Send callback if configured │
│ 6. Acknowledge event (remove from queue) │
│ │
└─────────────────────────────────────────────────────────────────┘Queued Transition Runner - A background worker that:
- Polls the queue for pending events
- Processes events in order (per-aggregate)
- Handles retries on failure
- Sends callbacks on completion
// Conceptual PHP implementation
class QueuedTransitionRunner
{
public function process(QueuedEvent $event): void
{
$machine = $this->loadMachine($event->machine_id);
$state = $this->loadState($event->aggregate_id);
// Run the actual transitions
$newState = $machine->transition($state, $event);
$this->persistState($event->aggregate_id, $newState);
// Send callback if configured
if ($event->hasCallback()) {
$this->sendCallback($event, $newState);
}
}
}14.2 Storage Backend
Queues should be backed by persistent storage:
- Development: SQLite or in-memory
- Production: Redis, PostgreSQL, or dedicated message queue
14.3 Concurrency Control
Per-aggregate locking ensures events for the same aggregate don't process concurrently:
Lock: order-abc123
→ Process :checkout
→ Release lock
Lock: order-abc123
→ Process :add_item
→ ...14.4 Idempotency
Event handlers should be idempotent for safe retries:
on :checkout from @customer (api)
// Check if already processed
? order already exists with $idempotency_key
return existing order
// Process normally
$order_id becomes uuid()
order moves to #awaiting_payment14.5 PHP Binding
The PHP binding should integrate with:
- Laravel Queue (Redis, Database, SQS)
- Symfony Messenger
- Custom queue adapters
15. Summary
┌─────────────────────────────────────────────────────────────┐
│ │
│ Queue Configuration │
│ ─────────────────── │
│ - System-level defaults │
│ - Machine-level overrides │
│ - Per-event priority │
│ │
│ Ordering Guarantees │
│ ─────────────────── │
│ - Per-aggregate: Sequential │
│ - Cross-aggregate: Parallel │
│ │
│ Failure Handling │
│ ───────────────── │
│ - Automatic retry with backoff │
│ - Dead letter queue for failed events │
│ - Manual retry via CLI │
│ │
│ Observability │
│ ───────────── │
│ - CLI: eventflow queue status │
│ - Metrics for monitoring systems │
│ - Health checks and alerts │
│ │
└─────────────────────────────────────────────────────────────┘Philosophy
Events flow through queues. Queues guarantee order.
Machines process. Queues orchestrate.
The queue system follows EventFlow's natural language philosophy while providing enterprise-grade reliability and observability.
16. Related Proposals
- Machine Response Proposal - Structured responses from API events
- Data Validation Proposal - Event validation (runs before queue)
- Test Scenarios Proposal - Delta-based testing