Tuesday, October 24, 2017

Event Sourced Aggregates Part 1: Outline of a typical implementation

This is the first post in a series of posts that takes its offset in a design problem I've encountered repeatedly with event sourced aggregates: They grow every time a feature is added. Nothing (almost) is removed from them, so over time they grow very big and gnarly. Why does this happen? Because typical implementations of event sourced aggregates violate the Open/Closed principle.
Through this series of post, I will show how event sourced aggregates violate Open/Closed and - as a consequence - tend to grow monotonically, and then show how we can address that by re-factoring away from the big aggregate towards a small and manageable one. Cliffhanger and spoiler: The aggregate will become anemic and I think that is a good thing.

The complete code from the series is on GitHub, where there is a branch with the code for the first two posts.

Event sourced aggregates

The context of what I am exploring in this series of posts is systems based on Domain Driven Design, where some or all of the aggregates are stored using event sourcing. Often these systems also use CQRS - very much inspired by Greg Youngs talks and writings.
Using event sourcing for storing the aggregates, means that the aggregate code does not change the state of the aggregate directly, instead it emits an event. The event is applied to the aggregate which is where changes to the state of the aggregate happens, but the event is also stored to a data store. Since aggregate state is only changed when an event is applied, the current aggregate state can be recreated by reading all the events for a given aggregate up form the data store and applying each one to the aggregate. The benefits of this approach are many (when applied to a suitable domain) and described elsewhere, so I wont go into them here.

A typical C# implementation

Typically implementations of all this follow a structure where requests coming in from the outside - be it though a client making a request to an HTTP endpoint, a message on a queue from some other service or something else - result in a chain that goes like this:
  1. A command is sent, asking the domain to perform a certain task 
  2. A command handler picks up that command, fetches the appropriate aggregate and triggers appropriate domain behavior. Fetching the aggregate involves replaying all the aggregates events (event sourcing!)
  3. A method on an aggregate is called, and that is where the actual domain logic is implemented
  4. Whenever the domain logic needs to change some state it emits an event (event sourcing, again) of a specific type
  5. A 'when' method for the specific event type on the aggregate is called and updates the state of the aggregate
  6. After the aggregate is done, all the events emitted during execution of the domain logic is dispatched to any number of event handlers that cause side effects like updating view models, or sending messages to other services.
To put this whole process into code, let's think of an example: Creating a user in some imaginary system. The first step is to send the create user command:


Next step is the event handler for the create user command. Note that in this example I use the MediatR library to connect the sending of a command to the handler for the command.


Note that most of what is going on here is the same as for other handlers for other commands: Pick the right aggregate and, after executing domain logic, save that aggregate and then dispatch whatever events were emitted.

On line 19 of the handler we call into the aggregate. The code in the aggregate looks like this:


At line 11 we call the Emit method. This is how most implementations I've seen work, and typically that Emit method is part of the Aggregate base class and looks something like this:


Notice how Emit calls Play which uses reflection to find a When method on the aggregate itself, and to call that When method. The When method is supposed to update the state of the aggregate and is also the method that gets called during event replay. More on that below. For now let's see the when method:


That's pretty much it, though there a few things I have skipped over a bit quickly: How the aggregate is fetched, how it is saved and how events are dispatched. I will not go into the event dispatching, since it is not relevant to the point I am making in this series, but the code is on Github, if you want to look. As for the other two bits - fetching and saving aggregates - lets start with how aggregates are saved:


As you can see saving the aggregate essentially means saving a list of events. The list should contain all the events that has ever been emitted by the aggregate. That is the essence of event sourcing. When it comes to fetching the aggregate, the list of events is read, and each one is replayed on a new clean aggregate object - that is the When methods for each event is called in turn. Since only the When methods update the state of the aggregate the result is an aggregate object in the right state. The Get method on the aggregate repository (which does the fetching) looks like this:


And the Replay method called in line 14 just runs through the list of events and plays each on them in turn, like this:


That pretty much outline the implementations of event sourced aggregates I seem to come across. 

That's it for now. The next posts will:

  1. Add a second feature and see how the aggregate starts to violate Open/Closed principle
  2. Make the command handlers smarter and clean up some repetitiveness
  3. Make the aggregate anemic in a naive way, leaving a stable aggregate, but introducing a new Open/Closed violation
  4. Make the aggregate anemic, events (a little) smart, and solve the Open/Closed violation

No comments:

Post a Comment