Simple event sourcing - conflict resolution (part 4)

After our deep dive into a Redis event store implementation
we’re now getting back to actually adding functionality to the blogging application. Like
the Getting started with Rails guide we’ll add the capability to add comments to blog
posts. Adding this functionality is straightforward, but it will require us to look into resolving conflicts when multiple people make modifications
to the same blog post or comment concurrently.

Other Parts


To add the new functionality to our application we will first define
the new events and supporting data
types. Notice that by focusing on the events we are actually thinking about the behaviour of the application first, instead of just the data
model. In such a simple application as this blogging application it’s a rather subtle distinction, but when your application is more complicated
events can help you to get a good understanding of what your application is supposed to do. Here are the event definitions for adding and deleting

{% highlight scala linenos %}
case class CommentId(value: Int)
object CommentId {
implicit val CommentIdFormat: Format[CommentId] = valueFormat(apply)(unapply)
implicit val CommentIdOrdering: Ordering[CommentId] =

case class CommentContent(commenter: String, body: String)

// [...]

sealed trait PostCommentEvent extends PostEvent {
def commentId: CommentId
case class CommentAdded(postId: PostId, commentId: CommentId, content: CommentContent) extends PostCommentEvent
case class CommentDeleted(postId: PostId, commentId: CommentId) extends PostCommentEvent

object PostEvent {
// [...]

implicit val CommentContentFormat: Format[CommentContent] = objectFormat(“commenter”, “body”)(CommentContent.apply)(CommentContent.unapply)

implicit val PostEventFormat: Format[PostEvent] = typeChoiceFormat(
"PostAdded" -> objectFormat("postId", "content")(PostAdded.apply)(PostAdded.unapply),
"PostEdited" -> objectFormat("postId", "content")(PostEdited.apply)(PostEdited.unapply),
"PostDeleted" -> objectFormat("postId")(PostDeleted.apply)(PostDeleted.unapply),
"CommentAdded" -> objectFormat("postId", "commentId", "content")(CommentAdded.apply)(CommentAdded.unapply),
"CommentDeleted" -> objectFormat("postId", "commentId")(CommentDeleted.apply)(CommentDeleted.unapply))
{% endhighlight %}

Since comments are always part of a blog post, we’ll can use a simple sequential comment identifier. The first comment of a post will have id
CommentId(1), the second one CommentId(2), etc. Obviously, we could also generate UUIDs for comments (and it would actually simplify the code),
but by working with a sequential identifier we’ll slowly introduce some "domain" logic into our example application. We’ll also use CommentId as a
key in a sorted map, so we need to define the Ordering (line 4), which is
based on the underlying numeric value.

CommentContent is a simple container for the name of the commenter and the comment text.

The CommentAdded and CommentDeleted events are both subtypes of the PostCommentEvent trait, which in turn extends PostEvent. This makes it
easier to distinguish comment related events which will be useful for resolving conflicts automatically.

To store the events in the event store, we also define and extend the
necessary Format instances.


Now that we have defined the new events, we can adjust our view models to keep track of comments as part of
the post class:

case class Post(
  id: PostId,
  revision: StreamRevision,
  content: PostContent,
  nextCommentId: CommentId = CommentId(1),
  comments: SortedMap[CommentId, CommentContent] = SortedMap.empty)

case class Posts(byId: Map[PostId, Post] = Map.empty, orderedByTimeAdded: Seq[PostId] = Vector.empty) {
  // [...]

  def update(event: PostEvent, revision: StreamRevision): Posts = event match {
  // [...]
    case CommentAdded(id, commentId, content) =>
      modify(id) { post =>
          revision = revision,
          nextCommentId = CommentId(commentId.value + 1),
          comments = post.comments.updated(commentId, content))
    case CommentDeleted(id, commentId) =>
      modify(id) { post =>
          revision = revision,
          comments = post.comments  commentId)

  private[this] def modify(id: PostId)(f: Post => Post) =
    this.copy(byId = byId.updated(id, f(byId(id))))

The Post class is modified to keep track of the next available CommentId (starting at 1) and also tracks the comments, using a SortedMap1
from CommentId to CommentContent. Remember that the representation of the view model is not tied to any kind of database schema, so we can easily
change it whenever we need to. The contents of the memory will be automatically rebuild whenever we restart the application!

The update method also has to be changed to match against the new events and update the Post class accordingly. Updating nested, immutable data
structures is a bit more involved than the mutable equivalent, so we use a simple helper method to take care of the first two levels of nesting. There
are more general solutions (PDF) to this problem, but for now the modify method
will do.

Views and controller

Next we add the required routes to conf/routes:

POST /posts/:postId/comments ⏎
  controllers.PostsController.comments.add(postId: PostId, r: StreamRevision)
POST /posts/:postId/comments/:commentId/delete ⏎
  controllers.PostsController.comments.delete(postId: PostId, r: StreamRevision, commentId: CommentId)

Listing, deleting, and adding comments is all part of
the show.scala.html template.

@if(post.comments.nonEmpty) {

  ## Comments

  @for((commentId, commentContent) <- post.comments) {

    <div style="display: inline-block;">

    <form class="form-inline" style="display: inline-block;" action="@routes.PostsController.comments.delete(, post.revision, commentId)" method="POST">



    @commentContent.body — _by @commentContent.commenter_



## Add a comment:

@helper.form(action = routes.PostsController.comments.add(, post.revision)) {

  <fieldset>@helper.inputText(form("commenter"), ‘_label -> "Commenter", ‘required -> "required")
    @helper.textarea(form("body"), ‘_label -> "Body", ‘cols -> 80, ‘rows -> 10)

  <fieldset><button class="btn btn-primary">Submit</button></fieldset>

As you can see, the template is quite straightforward. The only thing that might be unfamiliar is the invocation of the conflictsMessagePanel
template, which we’ll get back to later.

Finally, we need to implement two new controller methods, which can be found in
the comments singleton object
inside of PostsController:

{% highlight scala linenos %}
// [...]

private[this] def withPost(postId: PostId)(found: Post => Result)(implicit request: Request[_]) = {

object comments {
val commentContentForm = Form(mapping(
"commenter" -> trimmedText.verifying(minLength(3)),
"body" -> trimmedText.verifying(minLength(3)))(CommentContent.apply)(CommentContent.unapply))

def add(postId: PostId, expected: StreamRevision) = Action { implicit request =>
withPost(postId) { post =>
formWithErrors => BadRequest(, formWithErrors)),
commentContent =>
commit(expected, CommentAdded(postId, post.nextCommentId, commentContent))(
onCommit = Redirect("info" -> "Comment added."),
onConflict = (actual, conflicts) => Conflict(, commentContentForm.fill(commentContent), conflicts))))

def delete(postId: PostId, expected: StreamRevision, commentId: CommentId) = Action { implicit request =>
withPost(postId) { post =>
def deletedResult = Redirect("info" -> "Comment deleted.")
post.comments.get(commentId) match {
case None => deletedResult
case Some(comment) =>
commit(expected, CommentDeleted(postId, commentId))(
onCommit = deletedResult,
onConflict = (actual, conflicts) => Conflict(, commentContentForm, conflicts)))
{% endhighlight %}

The code is a bit more complicated than the basic post actions, since adding and deleting comments requires the presence of a blog post instance. The
withPost helper method is used to read the required post from the memory image, or render a 404 Not Found result if the post is not present.

In the case of the add method we then validate the submitted form (line 14). If incorrect, we rerender the page with the error messages (line
15). Otherwise we commit a new CommentAdded event using the provided content and the next available comment id (line 17). If the commit succeeds
without conflict, we redirect the user to the blog post with the added comment (line 18). In case there is a conflict, we rerender the form but add
some information on the conflicts that occurred (line 19). The delete action is very similar.

This basically completes the addition of some new functionality. The effort required is comparable to a traditional database backed application, which
is good to know. But there is one fly in the ointment we need to fix before we can call it a day...

Conflict resolution

Now that we can add comments to post you’ll soon discover that multiple concurrent users will quickly get a
conflict. In part 2 we added conflict detection and rendered a placeholder
page whenever a conflict occurred. Now that conflicts are more likely to occur, we need to be a bit smarter about resolving these conflicts.

The basic idea is that comment events usually do not conflict, even when they apply to the same post and therefore the same event stream. We can
capture this knowledge in a method like this:

{% highlight scala linenos %}
def conflictsWith(committed: PostEvent, attempted: PostEvent) =
(committed, attempted) match {
case (a: PostCommentEvent, b: PostCommentEvent) => a.commentId == b.commentId
case (_: PostCommentEvent, _) => false
case _ => true
{% endhighlight %}

This method states that any two PostCommentEvents only conflict when they affect the same comment (line 3). Furthermore, any new blog post related
event does not conflict with an already committed PostCommentEvent (line 4). This allows the author to edit the blog post without getting conflicts
on added or removed comments. Any other event combination is considered to conflict (line 5). So if you add a comment while someone edited the post,
the system will give you a warning and ask you to resubmit your comment, if it is still applicable.

Notice that the main goal of this method is to decide whether we should ask the user for confirmation when a conflict occurs, or whether we should
just proceed with the commit even though changes were made while the user was working on their request. This is rather subjective and the details will
vary depending on your domain, your users, etc.

So now that we have a way to decide if two events conflict or not we need to modify our commit method to take this into account. This method is
defined in the PostsController:

{% highlight scala linenos %}
* Commits an event and applies it to the current state. If successful the
* provided onCommit callback is invoked and its result returned. Otherwise
* the onConflict is callback is invoked and its result returned.
private[this] def commit
(expected: StreamRevision, event: PostEvent)
(onCommit: => Result,
onConflict: (StreamRevision, Seq[PostEvent]) => Result): Result = {
def resolveConflict(committed: Seq[PostEvent], attempted: PostEvent) = {
val conflicting = committed.filter(PostEvent.conflictsWith(_, attempted))
if (conflicting.isEmpty) Right(attempted)
else Left(conflicting)

@tailrec def run(expected: StreamRevision, event: PostEvent): Result =
memoryImage.tryCommit(event.postId.toString, expected, event) match {
case Right(commit) =>
case Left(conflict) =>
resolveConflict(conflict.conflicting.flatMap(, event) match {
case Right(event) => run(conflict.actual, event)
case Left(conflicting) => onConflict(conflict.actual, conflicting)

run(expected, event)
{% endhighlight %}

The new commit method implementation first defines a resolveConflict helper method (line 10), which takes a list of already committed, potentially
conflicting events and uses the PostEvent.conflictsWith method to see if there are any real conflicts. If there are none, the attempted event is
returned. Otherwise the conflicting events are returned.

The run method (line 16) runs in a (tail-recursive) loop. It tries to commit against the expected revision of the event stream. If there is no
conflict, it returns the result of the provided onCommit callback. Otherwise it tries to resolve the conflicts. If this succeeds, it tries invokes
itself again2, but now with the latest known event stream revision as the expected revision. If the conflict cannot be resolved, the result of the
onConflict callback is returned instead.

Finally, line 27 simply kicks off the entire process using the provided revision and event.

With this in place actual conflicts should be quite rare. But we can still do better than just showing a generic "there was a conflict" error
page. This is the job of
conflictsMessagePanel.scala.html template. This
template shows a human readable version of the conflicts that occurred:

Conflict alert panel


Besides adding support for conflict resolution, the implementation of the blog post comment functionality was quite straightforward. In a blogging
application conflicts are probably quite rare, so it may not make sense to build in extensive UI support for this, but having this as an example
hopefully gives you some idea of what is possible. In more collaborative applications this kind of functionality is much more interesting and you may
prefer to immediately push updates directly to the client, instead of waiting for the client to submit a form. An example of this
is Pivotal Tracker or Apache Wave.

If your application has extreme availability requirements, similar conflict resolution can also help you deal with recovering from network
partitioning. Or applications that need to be able to run in disconnected mode. In these cases you will need to write an event stream function that
merges divergent event streams. Version control systems are examples of this, and can be a source of inspiration, although they usually don’t have
intention revealing events to work with.

But the main point is that the level of conflict resolution you need depends on your users and your application. Event sourcing gives you a great tool
to make conflicts easier to resolve, without necessarily complicating your application if you do not need this kind of functionality.

In the next part we’ll take a look at another kind of concurrency conflict that can occur in the current application when two or more users committing
events to the same event stream nearly simultaneously.


  1. The immutable SortedMap implementation is rather broken before
    Scala 2.10, but it’ll probably do for this example. 

  2. In practice you should put some limit on the number of retries, to avoid bugs causing infinite loops.