Recently I had the opportunity to pick the brain of someone who has more experience and exposure to large scale, event-sourced systems than I do. We talked about event sourcing, specifically command processing, the subject of this blog post. It was an enlightening conversation that reminded me that insight is information tempered with experience. No amount of book reading is a substitute for learning from watching things go horribly wrong in production 😃.
As a quick recap of event-sourcing, the fundamental building blocks of an event sourced application are:
- Aggregates - handle commands, emit events. Update state from events
- Projectors - project events into read-model views called projections
- Process (saga) managers - the inverse of aggregates: ingest events and emit commands
- Events - A representation of an immutable fact that occurred in the past
- Commands - A request for work to be performed
In the current version of Concordance (which I'll be changing), commands are stored in a durable stream and command handlers pick from the stream, perform validation, and emit events. On the surface, it seemed like a good idea to use a durable stream for commands. After all, you don't want to lose your commands, do you?
Narrator: Yes you do.
The problems with publishing commands to a queue for asynchronous processing are numerous and subtle. In the preceding timeline, we submit a command Withdraw(200)
to the queue. At T2 the event Withdrawn(200)
occurs and thereafter we see the new balance of $100. This all looks good and feels very "event sourcey".
The conceptual problem with this approach is that the decision that drove the creation of the command Withdraw(200)
was based on the state of the world at time T1. It doesn't matter if a human or a computer made that decision, the command is only meaningful as of a given snapshot of state.
Now let's look at how this can go wrong when commands are queued up in a stream.
- Balance 300, so enqueue a
Withdraw(200)
command. - Another component, at time T1, also sees balance 300 and enqueues a
Withdraw(250)
command. - Command 1 is valid as of
State(T1)
,Withdrawn(200)
is emitted - Command 2 is valid as of
State(T1)
,Withdrawn(250)
is emitted - Aggregate processes
Withdrawn(200)
- Aggregate processes
Withdrawn(250)
- ⚠️ Events are indisputable. If the event exists, it happened. - Aggregate emits
Overdrawn
You might be thinking that this a bit contrived, but it's actually very easy to build a system that works like this. So, in the interest of defensive programming, you decide that after every command is processed, all of the outbound events are applied to the aggregate, updating its state immediately.
So now instead of ending up overdrawn at T4, the Withdraw(250)
command will be rejected. But how does our application actually know that it was rejected, or worse yet, why it was rejected? If the commands are all queued up asynchronously into a stream, then the only way we can see them being rejected is either through logging/tracing or we create new events to publish when a command is rejected.
Remembering that events never die, if we emit WithdrawFailed
events whenever a command fails validation, we're stuck with that event for all eternity and we still don't have a system that can deal with commands in a way that actually makes sense for the application. Our signal:noise ratio will invert, and our event log will be full of command rejection noise.
What I've been leading up to is a fundamental concept of event sourcing that is easily overlooked: commands are inextricably linked to the state of the world at the time the command was created. A command to withdraw $200 at T6
is only meaningful given the state at T6
.
Even more plainly: never enqueue commands. The instant that command is enqueued it becomes irrelevant because the state of the world has moved on. While this is an opinion that is certainly open for debate, I am now firmly on Team Ephemeral Commands.1
Imagine the above timelines, but now instead of having just one or two potential sources of commands, you've got hundreds or even thousands. With commands being the only way work is done in an event-sourced system, such a large-scale system would grind to a halt because 99% of the work being done would be rejecting commands produced from stale state.
In another pattern apps people will tag a command with the state version/revision/generation number at the time it is enqueued. So now instead of Withdraw(250)
you have Withdraw(250, T6)
. Instead of having to spend time on complex validation, the aggregates can just reject all commands issued on a revision less than the current version. While this sounds useful, all it really does is make it easier for your system to determine which commands to reject, allowing you to spam your event log with rejection events even faster than before.
Instead of enqueuing them, commands should be be processed via request/reply. A live service should handle the command request, validate it as of the current state, and reject it accordingly or return a list of events. Not only does this give our application the chance to get more robust error messages as to why a command was rejected, but it also ensures that bad commands can't produce events. There will never exist an event produced from a stale command.
Underneath this pattern of maintaining a durable event stream but never persisting commands is the broader notion of being open to learning from those with different perspectives and backgrounds. We all make decisions based on the best information we have at the time, but seeking out those with more experience, different backgrounds, and different perspectives means we can continually re-evaluate our decisions for the benefit of our frameworks and products.
Case in point: I've created an issue in Concordance to no longer use a durable inbound stream for commands, and instead utilize a request/reply pattern.
Footnotes
-
That's right, a technologist changed their mind. It can happen to the best of us. ↩