Event Upcasting
When an event schema evolves, older stored or inbound events may still be encoded with a previous version. Upcasting transforms those legacy payloads to the latest schema version before they are handled or republished.
The upcasting pipeline lives in Hermodr.Schema. The publisher integration is in Hermodr.Publisher.EventSchema via UseEventUpcasting().
How it works
- The middleware reads the event's current schema version from the
dataversionextension attribute, or from the last segment of thedataschemaURI. - It looks up the latest registered schema for the event type.
- If the event is behind, it runs the registered upcasting pipeline to migrate the payload through one or more
IEventUpcastersteps. - The migrated payload,
dataversion, and optionallydataschemaare rewritten on the CloudEvent before it continues down the publisher pipeline.
Writing an upcaster
Implement IEventUpcaster and register it in DI:
public sealed class OrderPlacedV1ToV2Upcaster : IEventUpcaster
{
public bool CanUpcast(string eventType, Version fromVersion, Version toVersion)
=> eventType == "order.placed"
&& fromVersion == new Version(1, 0)
&& toVersion == new Version(2, 0);
public Task<JsonNode> UpcastAsync(UpcastContext context, CancellationToken cancellationToken = default)
{
if (context.Data is JsonObject obj)
{
// V2 split the single "customer" string into "customerId" and "customerName"
if (obj["customer"] is { } oldValue)
{
var parts = oldValue.GetValue<string>().Split(':');
obj["customerId"] = parts[0];
obj["customerName"] = parts.Length > 1 ? parts[1] : "";
obj.Remove("customer");
}
}
return Task.FromResult(context.Data);
}
}
Register the upcaster and enable upcasting in the publisher pipeline:
services.AddEventPublisher()
.UseEventUpcasting()
.AddTestChannel(e => received.Add(e));
services.AddSingleton<IEventUpcaster, OrderPlacedV1ToV2Upcaster>();
Register
IEventUpcasterinstances through the normal DI container. They are discovered automatically byEventUpcasterRegistry. You can also register them manually viaIEventUpcasterRegistry.Register(upcaster).
Chaining upcasters
The pipeline discovers intermediate schema versions from the registered IEventSchemaRegistry. If you have schemas for 1.0, 2.0, and 3.0, and upcasters for 1.0 → 2.0 and 2.0 → 3.0, an event at version 1.0 is migrated through both steps automatically.
Direct jumps are also supported: if an upcaster declares 1.0 → 3.0, the pipeline uses it instead of chaining.
Missing upcaster behaviour
Configure EventUpcastingOptions to decide what happens when no upcaster chain can be built:
services.AddEventPublisher()
.UseEventUpcasting(options =>
{
options.MissingUpcasterBehavior = MissingUpcasterBehavior.Throw;
});
| Behaviour | Effect |
|---|---|
PassThrough (default) | The event is published unchanged. |
Throw | An EventUpcastingException is thrown. |
Content type adapters
The built-in JsonEventUpcastingDataAdapter handles application/json and CloudEvents JSON content types. Future adapters can implement IEventUpcastingDataAdapter for XML, Protobuf, or other formats without changing the core pipeline.
Order in the publisher pipeline
Register upcasting before schema validation so that migrated events validate against the latest schema:
services.AddEventPublisher()
.UseEventUpcasting()
.UseSchemaValidation()
.AddRabbitMqChannel(...);
Version sources
The middleware uses the first available source:
- The
dataversionCloudEvents extension attribute. - The last path segment of the
dataschemaURI.
If neither is present, the event is passed through unchanged.