Skip to content

Eventing

The eventing capability turns a Cephalon app into an event-driven system — modules emit events into a broker, other modules (in this app or another service) consume them asynchronously. Eventing is what makes the “split into microservices later” promise real: the same module can publish events whether it’s composed into a monolith or a standalone service.

The current managed dispatch path is Wolverine; the abstractions in Cephalon.Eventing keep your modules transport-neutral so you can swap brokers without rewriting handlers.

PackageMaturityWhat it brings
Cephalon.EventingM3Transport-neutral primitives: IMessagePublisher, IMessageHandler<T>, [Event] attribute, MessageContext, process-manager base.
Cephalon.Eventing.WolverineM3Managed dispatch via Wolverine. Outbox, scheduled delivery, DLQ replay, partition affinity, process-manager state, broker topology.
appsettings.json
{
"Engine": {
"Messaging": {
"Enabled": true,
"Provider": "Wolverine",
"Wolverine": {
"Transport": "RabbitMq",
"ConnectionString": "amqps://user:[email protected]/vhost",
"ScheduledDelivery": { "Enabled": true },
"DeadLetter": { "Enabled": true, "MaxAttempts": 5 }
}
}
}
}
Program.cs
builder.Services
.AddCephalonAspNetCore()
.AddEventing(options => options.UseWolverine())
.AddModulesFromAssemblies(/* ... */);

The eventing capability is opt-in. Modules that emit events declare Capability.Eventing in their descriptor — the engine fails composition if Messaging:Enabled isn’t true.

Wolverine supports multiple brokers; pick based on operational maturity in your environment.

BrokerBest forAvoid if
In-memoryTests, single-process modular monolithYou need durability across restarts
RabbitMQMost production workloads, classic queueing semanticsYou need replay (RMQ doesn’t natively replay)
Azure Service BusAzure-native deploys, no broker to operateYou’re not on Azure
Amazon SQS / SNSAWS-native deploysYou need ordered delivery across many messages (SQS FIFO has per-group ordering only)
KafkaHigh-throughput streaming, replayable event logsYou need point-to-point queues — Kafka is partition-keyed pub/sub
NATS JetStreamLightweight, K8s-friendly, persistent streamsYour operators aren’t comfortable with NATS yet
Postgres (LISTEN/NOTIFY)Already running Postgres, want zero opsThroughput > a few hundred msg/sec

The only thing that changes is Engine:Messaging:Wolverine:Transport + the connection string. Module code ([Event], IMessagePublisher, IMessageHandler<T>) is identical.

RabbitMQ
{ "Wolverine": { "Transport": "RabbitMq", "ConnectionString": "amqps://…" } }
Azure Service Bus
{ "Wolverine": { "Transport": "AzureServiceBus", "ConnectionString": "Endpoint=sb://…" } }
Kafka
{ "Wolverine": { "Transport": "Kafka", "Bootstrap": "broker-1:9092,broker-2:9092" } }

Use the [Event] attribute so the eventing layer knows the routing key:

src/Acme.Store.Modules.Orders/Events/OrderPlaced.cs
using Cephalon.Eventing;
[Event(name: "acme.store.order-placed", version: "1.0")]
public sealed record OrderPlaced(
string OrderId,
string CustomerId,
decimal Total,
IReadOnlyList<OrderLine> Lines);
public sealed record OrderLine(string ProductId, int Quantity, decimal UnitPrice);

When the schema changes:

// v1 stays for backwards compatibility
[Event(name: "acme.store.order-placed", version: "1.0")]
public sealed record OrderPlacedV1(string OrderId, string CustomerId, decimal Total);
// v2 adds the lines collection
[Event(name: "acme.store.order-placed", version: "2.0")]
public sealed record OrderPlaced(
string OrderId, string CustomerId, decimal Total, IReadOnlyList<OrderLine> Lines);

Consumers handle whichever version is on the wire. Wolverine routes by name + version.

src/Acme.Store.Modules.Orders/Behaviors/PlaceOrderBehavior.cs
public sealed class PlaceOrderBehavior : IRestBehavior
{
public RestRoute Route => RestRoute.Post("/orders");
public async Task<IResult> Handle(
PlaceOrderPayload p,
IOrderRepository orders,
IMessagePublisher publisher,
CancellationToken ct)
{
var order = new Order { /* … */ };
await orders.AddAsync(order, ct);
await publisher.PublishAsync(
new OrderPlaced(order.Id, order.CustomerId, order.Total, /* … */),
ct);
return Results.Created($"/orders/{order.Id}", order);
}
}
  1. IMessagePublisher.PublishAsync writes the event into the outbox table in the same DbContext transaction as the order.
  2. The DbContext SaveChangesAsync() commits — order row + outbox row land atomically.
  3. A Wolverine background loop drains the outbox and dispatches to the broker.
  4. Consumers receive the event; if a consumer fails, Wolverine retries per the broker’s redelivery policy.

This is the transactional outbox pattern — at-least-once delivery without dual-writes.

src/Acme.Billing.Modules/InvoicingHandler.cs
public sealed class InvoicingHandler(
IInvoiceService invoices,
ILogger<InvoicingHandler> log) : IMessageHandler<OrderPlaced>
{
public async Task HandleAsync(
OrderPlaced message,
MessageContext ctx,
CancellationToken ct)
{
log.LogInformation("Issuing invoice for order {OrderId}", message.OrderId);
await invoices.IssueAsync(message.OrderId, message.Total, ct);
}
}

Register the handler in the module:

public override void RegisterServices(IServiceCollection services)
{
services.AddMessageHandler<OrderPlaced, InvoicingHandler>();
}

Every handler receives a MessageContext with useful metadata:

PropertyMeaning
MessageIdGlobally unique id of this delivery. Use for idempotency keys.
CorrelationIdTrace correlation id (carries OTLP trace context).
AttemptDelivery attempt number (1-based).
OriginalTimestampWhen the producer published.
HeadersDictionary of arbitrary headers (e.g. tenant id).

A long-running flow that reacts to events and emits new ones.

OrderFulfillmentSaga.cs
[Saga(state: typeof(FulfillmentState))]
public sealed class OrderFulfillmentSaga
{
// Starts a new saga when OrderPlaced arrives
public static FulfillmentState Start(OrderPlaced @event) => new(@event.OrderId)
{
InvoiceIssued = false, PaymentReceived = false, Shipped = false,
};
// Continues an existing saga on InvoiceIssued
public void Handle(InvoiceIssued @event, FulfillmentState state, IMessagePublisher pub)
{
state.InvoiceIssued = true;
pub.PublishAsync(new ChargeCustomer(state.OrderId, state.Total));
}
public void Handle(PaymentReceived @event, FulfillmentState state, IMessagePublisher pub)
{
state.PaymentReceived = true;
if (state is { InvoiceIssued: true, PaymentReceived: true })
pub.PublishAsync(new ShipOrder(state.OrderId));
}
public void Handle(OrderShipped @event, FulfillmentState state)
{
state.Shipped = true;
// Saga complete → state can be archived
}
}

State is persisted by Wolverine in your data store. The saga survives restarts.

Pattern 2: scheduled delivery (delay / retry-after)

Section titled “Pattern 2: scheduled delivery (delay / retry-after)”
await publisher.PublishAsync(
new SendReminderEmail(userId),
deliverAt: DateTimeOffset.UtcNow.AddHours(24),
ct);

Wolverine persists the message and dispatches when the time arrives. Supported brokers:

BrokerScheduled-delivery mechanism
RabbitMQWolverine’s own scheduler (DB-backed)
Azure Service BusNative ScheduledEnqueueTimeUtc
SQSNative DelaySeconds (up to 15 min); Wolverine for longer
KafkaWolverine’s DB-backed scheduler (Kafka has no native delay)
In-memoryWolverine’s in-process scheduler

When a message fails MaxAttempts times, it goes to the DLQ. Replay via the admin behavior:

// Built-in admin endpoint POST /engine/eventing/dlq/replay
// Or programmatically:
public sealed class DlqReplayCommand(IDeadLetterStore dlq, IMessagePublisher pub)
{
public async Task ReplayAsync(string envelopeId, CancellationToken ct)
{
var envelope = await dlq.GetAsync(envelopeId, ct);
await pub.RepublishAsync(envelope, ct);
await dlq.RemoveAsync(envelopeId, ct);
}
}

Pattern 4: idempotent handler with the inbox table

Section titled “Pattern 4: idempotent handler with the inbox table”

For at-least-once delivery (most brokers), handlers MUST be idempotent or use an inbox table.

IdempotentHandler.cs
public sealed class InvoicingHandler(
IInvoiceService invoices,
IInboxStore inbox) : IMessageHandler<OrderPlaced>
{
public async Task HandleAsync(OrderPlaced msg, MessageContext ctx, CancellationToken ct)
{
// Already processed this message? Skip.
if (await inbox.WasProcessedAsync(ctx.MessageId, ct)) return;
await invoices.IssueAsync(msg.OrderId, msg.Total, ct);
await inbox.MarkProcessedAsync(ctx.MessageId, ct);
// Both writes commit in the same EF transaction
}
}

IInboxStore is provided by Cephalon.Data.EntityFramework (table cephalon_eventing_inbox).

Pattern 5: fan-out (one event, many handlers)

Section titled “Pattern 5: fan-out (one event, many handlers)”

Just register multiple handlers for the same event:

services.AddMessageHandler<OrderPlaced, InvoicingHandler>(); // accounting team
services.AddMessageHandler<OrderPlaced, ShipmentSchedulerHandler>(); // fulfillment team
services.AddMessageHandler<OrderPlaced, AnalyticsIngestHandler>(); // data team

Each handler runs in its own scope. Failures are isolated.

Pattern 6: partition affinity (ordered processing per key)

Section titled “Pattern 6: partition affinity (ordered processing per key)”

Some events must be processed in order per entity (e.g. all events for OrderId=42 must be processed sequentially).

[Event(name: "acme.store.order-status-changed", version: "1.0")]
[PartitionKey(nameof(OrderId))]
public sealed record OrderStatusChanged(string OrderId, string NewStatus);

Wolverine routes messages with the same OrderId to the same partition / consumer instance.

Pattern 7: CDC capture (change data capture)

Section titled “Pattern 7: CDC capture (change data capture)”

When your domain commits to a database, also emit events from the database changelog. Useful for legacy systems where you can’t easily add event publishing.

public sealed class ProductsCdcContributor : ICdcCaptureContributor
{
public CdcSource Source => new(
name: "products-cdc",
connection: "Products",
slot: "products_logical_replication");
public void Configure(ICdcCaptureBuilder b)
{
b.OnInsert<Product>(p => new ProductCreated(p.Id, p.Name, p.Sku));
b.OnUpdate<Product>(p => new ProductChanged(p.Id, p.Name, p.Sku));
b.OnDelete<Product>(p => new ProductRemoved(p.Id));
}
}

The engine wires this to a Debezium / Postgres logical-replication consumer and dispatches via the same eventing pipeline. See Reference → Architecture → CDC capture.

Recipe: in-process events for a modular monolith

Section titled “Recipe: in-process events for a modular monolith”

For a monolith, in-memory transport is fast and zero-ops. Module A emits, Module B consumes — no broker.

{ "Engine": { "Messaging": { "Enabled": true, "Provider": "Wolverine",
"Wolverine": { "Transport": "Local" } } } }
{
"Engine": {
"Messaging": {
"Enabled": true, "Provider": "Wolverine",
"Wolverine": {
"Transport": "RabbitMq",
"ConnectionString": "amqps://user:pass@rabbitmq:5671/",
"Exchanges": {
"acme-events": { "Type": "topic", "Durable": true }
}
}
}
}
}
{
"Engine": {
"Messaging": {
"Enabled": true, "Provider": "Wolverine",
"Wolverine": {
"Transport": "Kafka",
"Bootstrap": "broker-1:9092,broker-2:9092",
"Topics": {
"acme-events": { "Partitions": 32, "ReplicationFactor": 3 }
}
}
}
}
}

Recipe: Azure Service Bus topics + subscriptions

Section titled “Recipe: Azure Service Bus topics + subscriptions”
{
"Engine": {
"Messaging": {
"Enabled": true, "Provider": "Wolverine",
"Wolverine": {
"Transport": "AzureServiceBus",
"ConnectionString": "Endpoint=sb://acme.servicebus.windows.net/;…",
"Topics": {
"acme-events": {
"Subscriptions": ["acme-billing", "acme-analytics"]
}
}
}
}
}
}

Every published / consumed event auto-instruments with:

  • Trace spanevent.publish / event.consume, with attributes for event name, version, attempt number.
  • Metriccephalon.eventing.messages.published, cephalon.eventing.messages.handled, cephalon.eventing.messages.failed, cephalon.eventing.dlq.depth.
  • Log — structured logs with cephalon.event.name, cephalon.event.id, cephalon.event.attempt.

In Grafana / Application Insights, you can dashboard:

  • Throughput — published / handled / failed per minute.
  • Latency — p50/p95/p99 publish-to-handle time (using trace spans).
  • DLQ depth — grows when handlers can’t keep up.
  • Per-handler error ratecephalon.eventing.messages.failed{handler="InvoicingHandler"}.
  • At-least-once delivery is the default. Handlers MUST be idempotent or use the inbox pattern. Exactly-once is not provided — it’s a hard distributed-systems problem.
  • Wolverine outbox requires EF Core. If you don’t use EF, you need a custom outbox impl. Plain Cephalon.Eventing works but you lose the transactional guarantee.
  • [PartitionKey] only works with brokers that support partitioning (Kafka, Azure Service Bus sessions). RabbitMQ / SQS don’t preserve per-key ordering across queues.
  • Scheduled delivery > 15 minutes uses Wolverine’s DB-backed scheduler. Don’t enable it on Postgres if you publish 10k+ scheduled msgs/sec — the polling becomes a bottleneck. Use Kafka or ASB at that volume.
  • DLQ replay is manual by default. Add an authentication scope (WithRequireScope("ops:dlq")) on the replay behavior so it’s not exposed publicly.
  • Saga state size — keep saga state small (< 64 KB). Large states slow down persistence; persist domain detail in your DbContext, keep only orchestration state in the saga.
  • Process managers don’t support compensation natively. If you need to “undo” steps in a long-running flow, model compensation as additional events (OrderPlacedCompensateOrderFulfillment).

Eventing patterns that pay off in production.

  • Events are past-tense facts. OrderPlaced, InvoiceIssued. Never PlaceOrder (that’s a command, belongs in a request).
  • Include the minimum identifying data + denormalised context. Consumers shouldn’t have to call back to the publisher’s API for basic info. OrderPlaced(orderId, customerId, total, items) — not just OrderPlaced(orderId).
  • Don’t put domain entities directly in events. Use records that are stable wire types; entities evolve with persistence concerns.
  • Version events from day 1. Add version: "1.0" to every [Event] attribute, even when there’s only one version. Saves the conversation 6 months later.
  • Every handler must be idempotent. At-least-once delivery is the norm; exactly-once is a myth. Either:
    • The handler’s effect is naturally idempotent (SET status = 'paid' for the same id twice is OK).
    • The handler uses the inbox table (IInboxStore) for deduplication.
  • MessageContext.MessageId is your dedup key. Same id = same delivery = skip.
  • Co-locate publisher.PublishAsync with the DbContext write. That’s the whole point — atomic transaction.
  • Don’t publish from a using scope outside the request. The outbox needs an ambient DbContext; outside the request scope it falls back to direct broker publish (no atomic guarantee).
  • Purge the outbox table aggressively. Wolverine doesn’t ship a built-in cleanup job — write your own IHostedService that deletes processed_at < UTCNOW - 7 days.
  • Keep saga state small. < 1 KB ideally, < 64 KB hard ceiling. Persist domain entities in your DbContext; persist only orchestration state in the saga.
  • Sagas should react, not query. If you need to “look up the current state” to decide what to do next, the data should already be in the saga state (denormalised on the events that triggered the saga).
  • Compensation as events, not as code. Don’t try to “undo” inside a saga’s exception handler. Emit a CompensateX event and have a handler do the work — easier to test, easier to retry.
ThingConventionExample
Event class<Past-tense fact> (record)OrderPlaced, InvoiceIssued
Event name<reverse-dns>.<context>.<event> (kebab)"acme.store.order-placed"
Handler class<Concern>Handler<EventType> or <Concern>On<Event>InvoicingHandler, BillingOnOrderPlaced
Saga class<Process>SagaOrderFulfillmentSaga
Process-manager state class<Process>StateFulfillmentState
DLQ envelope id formatUUID v7 / SfidStable across replays
Need replayable event log (e.g. event sourcing) → Kafka or NATS JetStream
Need point-to-point queues with strong durability → RabbitMQ or Azure Service Bus
On AWS, want minimum ops → SQS + SNS
On Azure, want minimum ops → Azure Service Bus
On Kubernetes, want lightweight → NATS JetStream
Postgres already in your stack, < 100 msg/sec → Postgres LISTEN/NOTIFY (acceptable)
For tests / monolith → In-memory (Wolverine "Local")
  • Batch processing: IMessageHandler<T> for individual messages, IBatchHandler<T> for grouped processing. ~5–10× throughput for I/O-bound handlers.
  • Partition by key for parallelism: with [PartitionKey], Wolverine processes the same partition serially but different partitions in parallel. Tune partition count to match consumer count.
  • Bounded channels for backpressure: if a handler is slow, Wolverine queues messages. Set MaxConcurrency per endpoint so a slow consumer doesn’t starve the rest.
  • OTLP trace context propagates automatically through the eventing layer — your distributed traces span publisher → broker → consumer without effort.
  • Alert on DLQ growth, not DLQ presence. Some failures are expected; sustained growth means the handler is broken.
  • Alert on cephalon.eventing.messages.failed rate > 1% per handler. Below that is usually transient.
  • Track outbox depth as a gauge metric (SELECT COUNT(*) FROM outbox WHERE processed_at IS NULL). Sudden spike means the drainer is stuck.
  • Trace correlation across services — set MessageContext.CorrelationId from the originating request. Otherwise events look like they appeared from nowhere in traces.
  • Use MessageSpy<T> for behavioural tests: assert that a handler publishes the expected event.
  • In-memory transport for tests: fast, deterministic. No need for Testcontainers brokers in unit tests.
  • Test the saga state-machine, not just one transition. Replay the event sequence end-to-end.
  • For integration tests, use Testcontainers RabbitMQ / Kafka to catch broker-specific gotchas (header size limits, message size limits, ordering quirks).
  • Don’t auto-replay DLQ messages. Always require human approval (a button in your admin UI, or a CLI command). Auto-replay can turn one bug into thousands of duplicate side-effects.
  • Replay in batches. If your DLQ has 10k messages, replay 100 at a time and monitor; don’t unleash all 10k at once.
  • Have a “stop the world” kill-switch. A way to pause the publisher (set Engine:Messaging:Enabled=false and restart, or runtime-config the message bus to drop publishes) so you can investigate without making the pile worse.
Don’tDo
Big “EnterpriseEvent” with every possible fieldSmall, focused events per fact
Synchronous request-reply over the brokerUse HTTP/gRPC for sync; events for async
Trying to chain N handlers for N steps of a processUse a saga — state-machine semantics, not handler chains
Catching all exceptions in handlers + suppressingLet it fail; Wolverine retries; the DLQ has a purpose
Event content matches DB row 1:1Events represent business facts, not DB state
Long-running work in a handlerHandler enqueues a job; a separate handler does the work
Shared event types across services living in a “Shared.Events” projectVersion them; consumers should tolerate v1 + v2 simultaneously