Event-Driven Architecture Tooling: CQRS, Event Sourcing, and Saga Patterns
Event-Driven Architecture Tooling: CQRS, Event Sourcing, and Saga Patterns
Event-driven architecture (EDA) replaces direct service-to-service calls with events -- immutable facts about things that happened. Instead of OrderService calling InventoryService calling PaymentService in a chain, OrderService publishes an OrderPlaced event, and the other services react independently. This decoupling makes systems more resilient, scalable, and easier to evolve.
But EDA introduces complexity. You need event buses, serialization standards, idempotent consumers, saga coordination, and often CQRS or event sourcing to manage state. This guide covers the tools and patterns that make event-driven systems practical, with real TypeScript and Go examples.
Core Patterns
Event Bus / Message Broker
The event bus is the backbone. Events are published to topics, and consumers subscribe. The three most common options are:
| Broker | Ordering | Retention | Consumer Groups | Best For |
|---|---|---|---|---|
| Apache Kafka | Per-partition | Configurable (days to forever) | Yes | High-throughput, event log |
| NATS JetStream | Per-stream | Configurable | Yes | Low-latency, simple operations |
| RabbitMQ | Per-queue | Until consumed (or TTL) | No (use competing consumers) | Task distribution, RPC |
Publishing Events (TypeScript + Kafka)
import { Kafka, Partitioners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka-1:9092', 'kafka-2:9092'],
});
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner,
idempotent: true, // Exactly-once semantics
});
interface OrderPlacedEvent {
type: 'order.placed';
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
total: number;
occurredAt: string;
}
async function publishOrderPlaced(order: Order): Promise<void> {
const event: OrderPlacedEvent = {
type: 'order.placed',
orderId: order.id,
customerId: order.customerId,
items: order.items,
total: order.total,
occurredAt: new Date().toISOString(),
};
await producer.send({
topic: 'orders',
messages: [{
key: order.id, // Partition by order ID for ordering
value: JSON.stringify(event),
headers: {
'event-type': 'order.placed',
'correlation-id': order.correlationId,
},
}],
});
}
Consuming Events (Go + Kafka)
package main
import (
"context"
"encoding/json"
"log"
"github.com/segmentio/kafka-go"
)
type OrderPlacedEvent struct {
Type string `json:"type"`
OrderID string `json:"orderId"`
CustomerID string `json:"customerId"`
Items []OrderItem `json:"items"`
Total float64 `json:"total"`
OccurredAt string `json:"occurredAt"`
}
type OrderItem struct {
ProductID string `json:"productId"`
Quantity int `json:"quantity"`
Price float64 `json:"price"`
}
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka-1:9092", "kafka-2:9092"},
GroupID: "inventory-service",
Topic: "orders",
MinBytes: 1,
MaxBytes: 10e6,
})
defer reader.Close()
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Fatal("read error:", err)
}
var event OrderPlacedEvent
if err := json.Unmarshal(msg.Value, &event); err != nil {
log.Printf("unmarshal error: %v", err)
continue
}
if err := handleOrderPlaced(event); err != nil {
log.Printf("handler error for order %s: %v", event.OrderID, err)
// In production: dead-letter queue, retry with backoff
}
}
}
func handleOrderPlaced(event OrderPlacedEvent) error {
// Reserve inventory for each item
for _, item := range event.Items {
if err := reserveStock(item.ProductID, item.Quantity); err != nil {
return err
}
}
return nil
}
CQRS: Command Query Responsibility Segregation
CQRS separates the write model (commands that change state) from the read model (queries that return data). This lets you optimize each side independently -- a normalized write model for consistency, and denormalized read models tailored to specific UI needs.
TypeScript CQRS Implementation
// Domain events
interface DomainEvent {
aggregateId: string;
type: string;
data: unknown;
metadata: {
correlationId: string;
occurredAt: string;
version: number;
};
}
// Command side: handle commands, emit events
interface Command {
type: string;
payload: unknown;
}
class OrderCommandHandler {
constructor(
private readonly repository: OrderRepository,
private readonly eventBus: EventBus,
) {}
async handle(command: PlaceOrderCommand): Promise<string> {
// Validate
if (command.items.length === 0) {
throw new Error('Order must have at least one item');
}
// Create aggregate
const order = Order.create({
customerId: command.customerId,
items: command.items,
});
// Persist
await this.repository.save(order);
// Publish events
for (const event of order.uncommittedEvents) {
await this.eventBus.publish(event);
}
return order.id;
}
}
// Query side: maintain denormalized read models
class OrderProjection {
constructor(private readonly readDb: ReadDatabase) {}
async onOrderPlaced(event: OrderPlacedEvent): Promise<void> {
await this.readDb.upsert('order_summaries', {
orderId: event.aggregateId,
customerId: event.data.customerId,
itemCount: event.data.items.length,
total: event.data.total,
status: 'placed',
placedAt: event.metadata.occurredAt,
});
}
async onOrderShipped(event: OrderShippedEvent): Promise<void> {
await this.readDb.update('order_summaries', event.aggregateId, {
status: 'shipped',
trackingNumber: event.data.trackingNumber,
shippedAt: event.metadata.occurredAt,
});
}
}
// Query handler: read from optimized read model
class OrderQueryHandler {
constructor(private readonly readDb: ReadDatabase) {}
async getCustomerOrders(customerId: string): Promise<OrderSummary[]> {
return this.readDb.query('order_summaries', {
customerId,
orderBy: 'placedAt DESC',
limit: 50,
});
}
}
Event Sourcing
Event sourcing stores the full history of state changes as a sequence of events, rather than storing only the current state. The current state is derived by replaying events. This gives you a complete audit trail and the ability to reconstruct state at any point in time.
Event Store Implementation
// Event store interface
interface EventStore {
append(streamId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
read(streamId: string, fromVersion?: number): Promise<DomainEvent[]>;
subscribe(eventTypes: string[], handler: (event: DomainEvent) => Promise<void>): void;
}
// PostgreSQL-backed event store
class PostgresEventStore implements EventStore {
constructor(private readonly pool: Pool) {}
async append(
streamId: string,
events: DomainEvent[],
expectedVersion: number,
): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
// Optimistic concurrency check
const result = await client.query(
'SELECT MAX(version) as current_version FROM events WHERE stream_id = $1',
[streamId],
);
const currentVersion = result.rows[0]?.current_version ?? 0;
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, got ${currentVersion}`,
);
}
// Append events
let version = expectedVersion;
for (const event of events) {
version++;
await client.query(
`INSERT INTO events (stream_id, version, event_type, data, metadata, occurred_at)
VALUES ($1, $2, $3, $4, $5, $6)`,
[streamId, version, event.type, JSON.stringify(event.data),
JSON.stringify(event.metadata), event.metadata.occurredAt],
);
}
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
async read(streamId: string, fromVersion = 0): Promise<DomainEvent[]> {
const result = await this.pool.query(
`SELECT * FROM events WHERE stream_id = $1 AND version > $2 ORDER BY version`,
[streamId, fromVersion],
);
return result.rows.map(row => ({
aggregateId: row.stream_id,
type: row.event_type,
data: row.data,
metadata: { ...row.metadata, version: row.version },
}));
}
}
// Aggregate that rebuilds from events
class Order {
private state: OrderState = { status: 'draft', items: [], total: 0 };
private version = 0;
public uncommittedEvents: DomainEvent[] = [];
static async load(eventStore: EventStore, orderId: string): Promise<Order> {
const order = new Order();
const events = await eventStore.read(orderId);
for (const event of events) {
order.apply(event);
order.version = event.metadata.version;
}
return order;
}
private apply(event: DomainEvent): void {
switch (event.type) {
case 'order.placed':
this.state.status = 'placed';
this.state.items = event.data.items;
this.state.total = event.data.total;
break;
case 'order.shipped':
this.state.status = 'shipped';
break;
case 'order.cancelled':
this.state.status = 'cancelled';
break;
}
}
}
Event Store Schema
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL,
version INTEGER NOT NULL,
event_type TEXT NOT NULL,
data JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}',
occurred_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (stream_id, version)
);
CREATE INDEX idx_events_stream ON events (stream_id, version);
CREATE INDEX idx_events_type ON events (event_type);
CREATE INDEX idx_events_occurred ON events (occurred_at);
Saga Pattern: Coordinating Multi-Service Transactions
When a business process spans multiple services, you cannot use a database transaction. The saga pattern coordinates multi-step processes using events, with compensation actions (rollbacks) for each step.
Orchestrated Saga (TypeScript)
interface SagaStep<T> {
name: string;
execute: (context: T) => Promise<void>;
compensate: (context: T) => Promise<void>;
}
class SagaOrchestrator<T> {
private steps: SagaStep<T>[] = [];
private completedSteps: SagaStep<T>[] = [];
addStep(step: SagaStep<T>): this {
this.steps.push(step);
return this;
}
async execute(context: T): Promise<void> {
for (const step of this.steps) {
try {
await step.execute(context);
this.completedSteps.push(step);
} catch (error) {
console.error(`Saga step "${step.name}" failed:`, error);
await this.compensate(context);
throw new SagaFailedError(step.name, error);
}
}
}
private async compensate(context: T): Promise<void> {
// Compensate in reverse order
for (const step of [...this.completedSteps].reverse()) {
try {
await step.compensate(context);
} catch (error) {
console.error(`Compensation for "${step.name}" failed:`, error);
// Log for manual intervention -- compensation failures are serious
}
}
}
}
// Usage: order fulfillment saga
interface OrderContext {
orderId: string;
customerId: string;
items: OrderItem[];
total: number;
paymentId?: string;
shipmentId?: string;
}
const orderSaga = new SagaOrchestrator<OrderContext>()
.addStep({
name: 'reserve-inventory',
execute: async (ctx) => {
await inventoryService.reserve(ctx.orderId, ctx.items);
},
compensate: async (ctx) => {
await inventoryService.release(ctx.orderId, ctx.items);
},
})
.addStep({
name: 'process-payment',
execute: async (ctx) => {
const payment = await paymentService.charge(ctx.customerId, ctx.total);
ctx.paymentId = payment.id;
},
compensate: async (ctx) => {
if (ctx.paymentId) {
await paymentService.refund(ctx.paymentId);
}
},
})
.addStep({
name: 'create-shipment',
execute: async (ctx) => {
const shipment = await shippingService.create(ctx.orderId, ctx.items);
ctx.shipmentId = shipment.id;
},
compensate: async (ctx) => {
if (ctx.shipmentId) {
await shippingService.cancel(ctx.shipmentId);
}
},
});
// Execute the saga
await orderSaga.execute({
orderId: 'order-123',
customerId: 'cust-456',
items: [{ productId: 'prod-1', quantity: 2, price: 29.99 }],
total: 59.98,
});
Choreography-Based Saga (Go)
In choreography, there is no orchestrator. Each service listens for events and publishes its own events. The process emerges from the interactions.
// inventory-service/handler.go
func (h *InventoryHandler) HandleOrderPlaced(ctx context.Context, event OrderPlacedEvent) error {
err := h.repo.ReserveStock(ctx, event.OrderID, event.Items)
if err != nil {
// Publish failure event -- payment service will not proceed
return h.publisher.Publish(ctx, "inventory", InventoryReservationFailed{
OrderID: event.OrderID,
Reason: err.Error(),
})
}
return h.publisher.Publish(ctx, "inventory", InventoryReserved{
OrderID: event.OrderID,
Items: event.Items,
})
}
// payment-service/handler.go
func (h *PaymentHandler) HandleInventoryReserved(ctx context.Context, event InventoryReserved) error {
order, err := h.orderClient.GetOrder(ctx, event.OrderID)
if err != nil {
return err
}
payment, err := h.processor.Charge(ctx, order.CustomerID, order.Total)
if err != nil {
// Publish failure -- inventory service will compensate
return h.publisher.Publish(ctx, "payments", PaymentFailed{
OrderID: event.OrderID,
Reason: err.Error(),
})
}
return h.publisher.Publish(ctx, "payments", PaymentProcessed{
OrderID: event.OrderID,
PaymentID: payment.ID,
})
}
// inventory-service/handler.go -- compensation
func (h *InventoryHandler) HandlePaymentFailed(ctx context.Context, event PaymentFailed) error {
return h.repo.ReleaseStock(ctx, event.OrderID)
}
Frameworks and Libraries
| Tool | Language | Pattern | Notes |
|---|---|---|---|
| EventStoreDB | Any (gRPC) | Event Sourcing | Purpose-built event store with projections |
| Axon Framework | Java/Kotlin | CQRS + Event Sourcing + Sagas | Full framework, opinionated |
| Temporal | Any (Go SDK, TS SDK) | Durable workflows / Sagas | Not EDA-specific but excellent for sagas |
| Watermill | Go | Event-driven messaging | Clean abstractions over Kafka, NATS, etc. |
| NestJS CQRS | TypeScript | CQRS + Event Sourcing | Built into NestJS framework |
| MassTransit | C# | Messaging + Sagas | Mature .NET library |
When to Use (and When Not To)
Use event-driven architecture when:
- Services need to react to changes without tight coupling
- You need an audit trail of everything that happened
- Different read models are needed for different consumers
- The system must be resilient to individual service failures
Do not use event-driven architecture when:
- You have a simple CRUD application with one database
- Immediate consistency is a hard requirement (EDA is eventually consistent)
- Your team is small and the added complexity is not justified
- You are building a prototype (start simple, add events later)
The biggest mistake teams make with EDA is adopting it too early. Start with a monolith. Extract services when you feel the pain. Add events when synchronous calls between services become a bottleneck. The patterns in this guide are powerful, but they are solutions to specific problems -- not starting points for every project.