Joseph Choe

Event Sourcing, Part 3: Publish and Subscribe

Note: Be sure to check out my 11+ hour video tutorial on event sourcing!

Over the last two months, I’ve talked about how to build event sourced components. One of these components accepts user registrations, while the other handles email uniqueness.

My goal in this sequence of essays is to demonstrate how to build event sourced systems. But before we can get into things like how to paint a user interface screen, we need a way for one component to communicate with the other. Let’s discuss that today, with code examples to demonstrate.

Disclaimer

This essay is a tool for learning. I’ve intended the below code for that purpose and not to be used in production environments. Read every line of code and make sure you understand each and every implication. Exercise your best judgement when taking code found on the Internet and putting it into your own codebase.

It’s All about Communication

Event sourced components communicate through messages. These messages, whether commands or events, are recorded in a message store, which in Eventide’s case is a single immutable messages table in a PostgreSQL database.

CREATE TABLE messages (
  global_position bigserial NOT NULL,
  position bigint NOT NULL,
  id UUID NOT NULL DEFAULT gen_random_uuid(),
  time TIMESTAMP WITHOUT TIME ZONE DEFAULT (now() AT TIME ZONE 'utc') NOT NULL,
  stream_name text NOT NULL,
  type text NOT NULL,
  data jsonb,
  metadata jsonb
);

Because of this, any service can read from any stream in the messages table and react to each message accordingly, without any knowledge of the producer of that message, except perhaps two things: the name of the stream they’re reading from and the contract of the messages therein.

It’s a simple SELECT query. Indeed, the broadcasters of those messages can have no idea who exactly is reading their messages.

This is typically known as the publish-subscribe messaging pattern.

But not only that, services can also send messages to other specific services, if they know the command stream name of the service in question. For example, our user registration component from Part 1 has a command stream where it reads Register command messages and writes Registered events.

But we don’t know where those messages are coming from. They could be coming from another service, or perhaps from a resource endpoint within a Rails controller. They could even be generated from a CSV file that’s uploaded to an FTP server.

All we know for certain is that our components read messages from one stream and react to them by writing messages to another.

With these tools in hand, we can have our components communicate with one another.

User Registration, Revisited

For our purposes, the email uniqueness component is complete. It has no need to have any knowledge about anything outside of itself, therefore we won’t be making any changes to the component.

However, in order for the user registration component to utilize email uniqueness, we’ll need to make a few changes to that component in particular.

class Registration
  include Schema::DataStructure

  attribute :id, String
  attribute :user_id, String
  attribute :email_address, String

  attribute :initiated_time, Time
  attribute :email_accepted_time, Time
  attribute :email_rejected_time, Time
  attribute :registered_time, Time
  attribute :cancelled_time, Time

  def initiated?
    !initiated_time.nil?
  end

  def email_accepted?
    !email_accepted_time.nil?
  end

  def email_rejected?
    !email_rejected_time.nil?
  end

  def registered?
    !registered_time.nil?
  end

  def cancelled?
    !cancelled_time.nil?
  end
end

The Registration entity has quite a bit more going on with it than the last time we saw it. The first three attributes are much the same. But next we have a series of Time attributes.

This is because we need a way to track the different states the registration is in. We need to know when we’ve begun the registration process, and we need to know when the registration has completed, either from the email address being accepted or rejected.

You can see this entity as a finite-state machine, with two divergent paths. One of them being from Initiated to EmailAccepted, and finally to Registered. But we also need to take into account the failure mode, where the email address was already claimed in a previous registration. That leads us down the path from Initiated to EmailRejected, and then to Cancelled.

Messages

I’m not going to show the messages in full because many of them you’ve already seen before, so there’s not much to discuss there. But also because they’re basically copies of one another with different class names.

class Register
  ## ...
end

class Initiated
  ## ...
end

class EmailAccepted
  ## ...
end

class EmailRejected
  ## ...
end

class Registered
  ## ...
end

class Cancelled
  ## ...
end

Like I’ve said before, software development is simply about copying data from one place to another. Rinse and repeat.

Also, there are a lot of messages. Just imagine them all having the same exact attributes.

Projection

The projection here is fairly straightforward. We have a bunch of events that copy data into the Registration entity.

class Projection
  include EntityProjection
  include Messages::Events

  entity_name :registration

  apply Initiated do |initiated|
    registration.id = registered.registration_id
    registration.user_id = registered.user_id
    registration.email_address = registered.email_address
    registration.initiated_time = Clock.parse(initiated.time)
  end

  apply EmailAccepted do |email_accepted|
    registration.email_accepted_time = Clock.parse(email_accepted.time)
  end

  apply EmailRejected do |email_rejected|
    registration.email_rejected_time = Clock.parse(email_rejected.time)
  end

  apply Registered do |registered|
    registration.registered_time = Clock.parse(registered.time)
  end

  apply Cancelled do |cancelled|
    registration.cancelled_time = Clock.parse(cancelled.time)
  end
end

Each event applies the time attribute to the entity in the corresponding place, which we know is important for our predicate methods.

Handlers

Here is where things wildly diverge from the original User Registration component. Instead of writing a Registered event in response to a Register command, we need to kick off our state machine by writing an Initiated event.

module Handlers
  class Commands
    ## ...

    handle Register do |register|
      registration_id = register.registration_id

      registration, version = store.fetch(registration_id, include: :version)

      if registration.initiated?
        logger.info(tag: :ignored) { "Command ignored (Command: #{register.message_type}, Registration ID: #{registration_id}, User ID: #{register.user_id})" }
        return
      end

      time = clock.iso8601

      stream_name = stream_name(registration_id)

      initiated = Initiated.follow(register)
      initiated.processed_time = time
      initiated.metadata.correlation_stream_name = stream_name

      write.(initiated, stream_name, expected_version: version)
    end
  end
end

Most of the write logic you’ll see in this essay is something you’ve already seen before, only applied in different ways. But note the correlation_stream_name written to the Initiated event’s metadata.

Within the Eventide toolkit, you have a lot of different metadata. For example, the causation stream name is used to denote which stream caused the message to be written. Another way to look at it is which stream did the preceding message come from? For example, the Initiated event’s causation stream name is the command stream name or registration:command-{registration_id}, while any message following the Initiated event will be registration-{registration_id}. And so on and so forth.

The correlation stream however is a bit different. Every single message in the long chain of messages we’ll be writing will have the same correlation stream name, because the correlation stream name is simply copied over whenever one message follows another. Put another way, the correlation stream name allows us to know the originating stream name of any message in the chain, even if they should happen to span different components and different streams.

Next, once we have written an Initiated event, we need to write a Claim command to the email uniqueness component, to let that component know we want to claim an email address.

module Handlers
  class Events
    ## ...

    handle Initiated do |initiated|
      registration_id = initiated.registration_id
      user_id = initiated.user_id
      email_address = initiated.email_address
      encoded_email_address = encode_email_address(email_address)

      claim = Claim.new
      claim.metadata.follow(initiated.metadata)

      claim.claim_id = registration_id
      claim.encoded_email_address = encoded_email_address
      claim.email_address = email_address
      claim.user_id = user_id

      stream_name = "userEmailAddress:command-#{encoded_email_address}"

      write.(claim, stream_name)
    end

We write to the User Email Address command stream, which then starts that process. You may be wondering why there’s no idempotence protection within this event’s handler. If the Registration component is stopped for whatever reason and reads the Initiated event again, a second or third Claim message will be written.

That’s okay because we expect commands to be written twice. The idempotence protection is already in place within the User Email Address component. It’s unnecessary to have it here, too.

More importantly, there’s no way for the Registration component to know whether a message was already sent to the User Email Address component. It has no access to the User Email Address component’s entity stream or projections, so it cannot make decisions one way or another.

In any case, from the last essay we know we’re expecting either a Claimed event to be written to the User Email Address entity stream or a ClaimRejected event.

module Handlers
  module UserEmailAddress
    class Events
      include Log::Dependency
      include Messaging::Handle
      include Messaging::StreamName
      include Messages::Events

      dependency :write, Messaging::Postgres::Write
      dependency :clock, Clock::UTC
      dependency :store, Store

      def configure
        Messaging::Postgres::Write.configure(self)
        Clock::UTC.configure(self)
        Store.configure(self)
      end

      category :registration

      handle Claimed do |claimed|
        correlation_stream_name = claimed.metadata.correlation_stream_name
        registration_id = Messaging::StreamName.get_id(correlation_stream_name)

        registration, version = store.fetch(registration_id, include: :version)

        if registration.email_claimed?
          logger.info(tag: :ignored) { "Event ignored (Event: #{claimed.class.name}, Registration ID: #{registration_id}, Player ID: #{claimed.player_id})" }
          return
        end

        email_claimed = EmailClaimed.follow(claimed, exclude: [
          :encoded_email_address,
          :sequence
        ])
        email_claimed.registration_id = registration_id
        email_claimed.processed_time = clock.iso8601

        stream_name = stream_name(registration_id)

        write.(email_claimed, stream_name, expected_version: version)
      end

      handle ClaimRejected do |claim_rejected|
        correlation_stream_name = claim_rejected.metadata.correlation_stream_name
        registration_id = Messaging::StreamName.get_id(correlation_stream_name)

        registration, version = store.fetch(registration_id, include: :version)

        if registration.email_rejected?
          logger.info(tag: :ignored) { "Event ignored (Event: #{claim_rejected.class.name}, Registration ID: #{registration_id}, Player ID: #{claim_rejected.player_id})" }
          return
        end

        email_rejected = EmailRejected.follow(claim_rejected, exclude: [
          :encoded_email_address,
          :sequence
        ])
        email_rejected.registration_id = registration_id
        email_rejected.processed_time = clock.iso8601

        stream_name = stream_name(registration_id)

        write.(email_rejected, stream_name, expected_version: version)
      end
    end
  end
end

What’s interesting here is the usage of the correlation_stream_name. Remember, when the correlation stream name was set in the Initiated event, any subsequent event written to any stream had the same correlation stream name copied over. We know that any Claimed or ClaimRejected event with a correlation stream name corresponding to the Registration stream originated within that component.

There may be any number of components and services interested in claiming email addresses, and we certainly don’t want to read those into our Registration component. So this is how we know for certain we’re only dealing with messages that originated within the Registration component.

The correlation stream name is also how we retrieve the registration_id. Remember that the email uniqueness component has no knowledge of user registrations. But because the correlation stream name within the messages’ metadata is copied over from message to message, we can retrieve it here.

In both cases, we write an EmailAccepted and EmailRejected, respectively. This allows us to preserve idempotence, because now the Registration knows whether it read the Claimed or ClaimRejected event previously or not.

Once we have either one of those events, we can write a Registered or Cancelled event, thus ending the registration process.

module Handlers
  class Events
    ## ...

    handle EmailAccepted do |email_accepted|
      registration_id = email_accepted.registration_id

      registration, version = store.fetch(registration_id, include: :version)

      if registration.registered?
        logger.info(tag: :ignored) { "Event ignored (Event: #{email_accepted.message_type}, Registration ID: #{registration_id}, User ID: #{email_accepted.user_id})" }
        return
      end

      time = clock.iso8601

      stream_name = stream_name(registration_id)

      registered = Registered.follow(email_accepted, exclude: [
        :claim_id,
        :time,
        :processed_time
      ])
      registered.time = time

      write.(registered, stream_name, expected_version: version)
    end

    handle EmailRejected do |email_rejected|
      registration_id = email_rejected.registration_id

      registration, version = store.fetch(registration_id, include: :version)

      if registration.cancelled?
        logger.info(tag: :ignored) { "Event ignored (Event: #{email_rejected.message_type}, Registration ID: #{registration_id}, User ID: #{email_rejected.user_id})" }
        return
      end

      time = clock.iso8601

      stream_name = stream_name(registration_id)

      cancelled = Cancelled.follow(email_rejected, exclude: [
        :claim_id,
        :time,
        :processed_time
      ])
      cancelled.time = time

      write.(cancelled, stream_name, expected_version: version)
    end
  end
end

With these in hand, we can inform the user that their registration was either successful or not. We may even be able to tell them the reason their registration failed was that their email address was already in use.

With the publish-subscribe pattern, you can imagine that we can take this even further. For example, a Welcome Email component could subscribe to any published Registered events and send onboarding emails to the user.

Conclusion

The two components are now functionally complete. We have a user registration component that takes email uniqueness into account by communicating with a component created specifically for that purpose.

But how do we use these components in a web or native application? How do you paint pixels on a screen with an event sourced architecture?

That’s what we’ll talk about next month.