A Map of Akka

The amazing Akka project was started by Jonas Bonér in 2009 with the aim to bring the actor model, which has proven to deliver an availability of six nines (99.9999%) and even more, to the JVM. Akka, which is open source and available under the Apache 2 license, offers APIs for both Java and Scala. If you are interested in Akka’s history, take a look at the Akka 5 years anniversary blog post.

… Read the whole post on the codecentric blog.

A Map of Akka

Attention: Seq is not immutable!

Update: Thanks to gerferra (see comment below) I have added a paragraph explaining that you should use chained package clauses in combination with retargeting Seq.

One of Scala’s guiding principles is its bias towards immutability: While we can use vars and mutable objects, we are encouraged to use vals and immutable objects. This manifests itself very clearly in the collection library which contains both immutable and mutable collections. Actually there are three main packages:

  1. scala.collection
  2. scala.collection.immutable
  3. scala.collection.mutable

scala.collection contains basic objects which are extended by the immutable collections in scala.collection.immutable and the mutable ones in scala.collection.mutable.

It is important to understand that the basic collections are neither immutable or mutable, they just provide common functionality. If an API makes use of a basic collection, e.g. for the type of a method parameter, we can either use an immutable or mutable one for the argument.

Let’s look at a simple example:

def basicSetSize[A](as: scala.collection.Set[A]): Int =
as.size
scala> basicSetSize(scala.collection.immutable.Set(1, 2, 3))
res0: Int = 3

scala> basicSetSize(scala.collection.mutable.Set(1, 2, 3))
res1: Int = 3

As you can see, we can use an immutable or a mutable Set for a basic Set. So far so good.

Now back to Scala’s guiding principle of immutability. In order to give preference to the immutable collections, the Predef singleton object and the scala package object contain a number of aliases that bring the immutable collections into scope without any imports. That means that we can use many immutable collections even if we don’t import anything from either scala.collection.immutable or scala.collection.mutable.

Let’s rewrite the above example:

def basicSetSize[A](as: Set[A]): Int =
as.size
scala> basicSetSize(scala.collection.immutable.Set(1, 2, 3))
res0: Int = 3

scala> basicSetSize(scala.collection.mutable.Set(1, 2, 3))
:9: error: type mismatch;
found : scala.collection.mutable.Set[Int]
required: scala.collection.immutable.Set[?]

scala> basicSetSize(scala.collection.Set(1, 2, 3))
:9: error: type mismatch;
found : scala.collection.Set[Int]
required: scala.collection.immutable.Set[?]

As you can see, Set without imports or qualifying package essentially means scala.collection.immutable.Set and we can’t use a mutable or basic Set.

Now this is expected, because Scala is encouraging us to use immutable objects. Many think that this principle holds for all major collections types, but this is not true! Guess where the aliases for Seq are pointing to: To scala.collection.immutable.Seq like for Set and other collections? No!

type Seq[+A] = scala.collection.Seq[A]
val Seq = scala.collection.Seq

These aliases are defined in the scala package object. The reason for this exception is, that one should be able to use arrays, which are mutable, for the “default” sequence. Predef contains implicit conversions from Array to WrappedArray which mixes in various mutable collection traits.

While this makes it comfortable to work with arrays, I consider this very dangerous, because it is too easy to forget importing scala.collection.immutable.Seq. If we forget this import and hence use the basic sequence in our API, users can provide mutable sequences which most certainly will lead to trouble in any concurrent program. Just imagine objects used as messages in an Akka based system …

If you want to be sure your code is using immutable sequences, I recommend adding the following lines to the top-level package object in your projects:

type Seq[+A] = scala.collection.immutable.Seq[A]

val Seq = scala.collection.immutable.Seq

If you use chained package clauses (see below) in all your Scala source file, this will automatically “retarget” Seq to the immutable sequence and all should be good.

package name.heikoseeberger.toplevel
package subpackage
Attention: Seq is not immutable!

Name based extractors in Scala 2.11

Update 2: Thanks to @SeanTAllen I have fixed a typo in one of the code examples.

Update: Thanks to @xuwei_k and @eed3si9n I have learned that value classes which mix in a universal trait incur the cost of allocation. Therefore I had to change the second example (NameOpt), which – by the way – lead to significant LOC reduction ;-)

The recently released milestone M5 for Scala 2.11.0 contains a nice little gem: extractors which don’t need any allocations. This should improve performance significantly and make extractors ready for prime time.

So far an extractor had to be an object with an unapply method taking an arbitrary argument and returning an Option, potentially destructing the given argument into a value of a different type:

def unapply(any: A): Option[B]

There are variations, e.g. to extract several values or sequences of such, but the basic shape remains the same. Here is a simple example:

object PositiveInt {
  def unapply(n: Int): Option[Int] =
    if (n > 0) Some(n) else None
}

Extractors are quite useful to write concise and expressive code. Yet unapply returning an Option might have a negative impact on runtime performance, because an instance of Some needs to be created for each successful extraction.

Scala 2.11 introduces name based extractors which no longer require unapply to return an Option. Instead any object which defines the two methods isEmpty and get will do the job:

isEmpty: Boolean
get: A

Clearly this could be an Option, but we can also make use of value classes which have been introduced in Scala 2.10. Value classes, which extend from AnyVal, don’t get allocated, because all operations are inlined by the compiler. Let’s rewrite the above simple example:

class PositiveIntOpt(val n: Int) extends AnyVal {
  def isEmpty: Boolean =
    n <= 0
  def get: Int =
    n
}

object PositiveInt {
  def unapply(n: Int): PositiveIntOpt =
    new PositiveIntOpt(n)
}

Let’s give it a spin in the REPL:

scala> val PositiveInt(n) = 1
n: Int = 1

scala> val PositiveInt(n) = 0
scala.MatchError: 0 (of class java.lang.Integer)

Woot, that works! Here is another example, this time extracting multiple values. We are making use of value classes, universal traits (extending Any) and the null object pattern to minimize allocations:

class NameOpt(val parts: (String, String)) extends AnyVal {
  def isEmpty: Boolean =
    parts == null
  def get: (String, String) =
    parts
}

object Name {
  def unapply(name: String): NameOpt = {
    val parts = name split " "
    if (parts.length == 2)
      new NameOpt((parts(0), parts(1)))
    else
      new NameOpt(null)
  }
}

Of course, this adds quite a bit boilerplate compared to simply returning an Option. But the performance gain might be worth it.

If we control the class we want to destruct, we can even add the extractor logic to the class itself, thereby avoiding any allocations at all. In this case, in order to extract multiple values, we have to define the according product selectors _1, _2, etc.:

object Name {
  def unapply(name: Name) =
    name
}

class Name(first: String, last: String) {
  def isEmpty: Boolean =
    false
  def get: Name =
    this
  def _1: String =
    first
  def _2: String =
    last
}

All right, that’s it. Before you start using name based extractors, please make sure to check the next milestones and finally the 2.11 release for any changes to this new feature.

Name based extractors in Scala 2.11

Gabbler, a reactive chat app – part 3

In the last blog post I have shown how we can use Akka I/O and spray-can to implemented a simple solution for server-side push via long polling. Today we are going to improve it with regard to maintainability and robustness.

Changing behavior instead of mutable state

Currently our Gabbler contains the two mutable fields messages and storedCompleter. There is nothing fundamentally wrong with actors having mutable state. On the contrary, it’s one of the strengthes of the actor model to encapsulate mutable state in a thread-safe way.

Yet in our case the behavior of the actor depends on the two mutable fields in an already slightly entangled fashion: when we receive a completer, we either apply it to complete the request or store it for later, depending on the value of the messages field. And when we receive a message, we have to check if we have a stored completer.

As we will see later, we will have to add another “dimension” to the behavior. Using another mutable field for that would result in a pretty entangled mess which would be too hard to understand and therefore lead to poor maintainability.

How can we solve this issue? Maybe we should think in terms of logical states instead of fields which are implementation details. Or put another way, we should think about a state machine for a Gabbler. Why not give it a try?

All right, when a Gabbler actor gets created, it is waiting for a message or a completer. Let’s call that state waiting. When it gets a GET request in this waiting state, it receives a completer, but in order to complete the request, it needs to wait for a message. Therefore let’s call that state waiting for message. When it gets a POST request in the waiting state, it receives a message and needs to wait for a completer, so let’s call that state waiting for completer. Here’s a state diagram showing these states and possible transitions:

Gabbler states

When a Gabbler is in waiting for message, receiving another completer doesen’t cause a state transition. We would just drop the old completer and use the new one. But when a message is received, the request is completed and the Gabbler transitions back to waiting again.

Similarly, when a Gabbler is in waiting for completer, receiving another message doesen’t cause a state transition. In this case we would add the new message to the one(s) already received. When a completer is received, the request is completed and the Gabbler gets waiting again.

As you can see, the behavior of a Gabbler depends on the state it is in. In other words, we have to change the behavior to build this state machine. Luckily that’s easy, we just have to use the ActorContext.become method which takes the new behavior as a Reveive parameter:

class Gabbler extends Actor {

  import GabblerService._

  def receive: Receive =
    waiting

  def waiting: Receive = {
    case completer: Completer => context become waitingForMessage(completer)
    case message: Message     => context become waitingForCompleter(message :: Nil)
  }

  def waitingForMessage(completer: Completer): Receive = {
    case completer: Completer => context become waitingForMessage(completer)
    case message: Message     => completeAndWait(completer, message :: Nil)
  }

  def waitingForCompleter(messages: List[Message]): Receive = {
    case completer: Completer => completeAndWait(completer, messages)
    case message: Message     => context become waitingForCompleter(message :: messages)
  }

  def completeAndWait(completer: Completer, messages: List[Message]): Unit = {
    completer(messages)
    context become waiting
  }
}

As you can see, instead of the mutable fields we have introduced three methods, each returning a behavior specific for one of the three states. While we have effectively added four lines of code as compared to the old implementation, I strongly believe that this implementation is superior, because the notion of a state machine makes the implementation easier to understand. In addition we could get rid of the mutable fields and the logic within each state is extremely simple and in particular doesn’t contain nested conditions.

When we use our AngularJS frontend which uses long polling, i.e. sends GET requests asking for messages, our Gabbler actors will be in the waiting for message state most of the time, because every time we complete a request and a Gabbler transitions to waiting, the AngularJS frontend will send a new GET request, i.e. a new completer, within a very short timeframe. Depending on the network latency this period, in which the Gabbler actors are in waiting or eventually in waiting for completer, will typically be in the range between 100 milliseconds and one second.

Robustness

So far we haven’t spent any thoughts on robustness. One reason for that is that Akka offers fault tolerance out of the box. But of course we have to spend some extra efforts to build a really robust system.

Let’s go back to the usage example from the last blog post: first start the application, then ask for messages sending a GET request with curl and then … just wait until the request times out.

~$ curl -u Heiko:Heiko http://localhost:8080/api/messages
The server was not able to produce a timely response to your request.~$

After waiting a little while – 20 seconds in my case with OS X 10.8.4 – curl gets impatient and cancels the request. This is a pretty reasonable behavior which we should expect from any HTTP client. Therefore we should make sure that we don’t get any issues on the server.

Let’s see what happens if we now send a message using a POST request:

$curl -u Roland:Roland -d '{ "text": "Akka rocks!", "username": "" }' -H "Content-Type: application/json" http://localhost:8080/api/messages
$

We don’t see anything on the client side, but the server log shows the following:

09:32:25 ERROR [akka://gabbler-service-system/user/gabbler-service/Heiko] - null
java.lang.NullPointerException: null
    at spray.can.server.ResponseReceiverRef.unhandledMessage(ResponseReceiverRef.scala:80) ~[spray-can-1.2-20130628.jar:1.2-20
…
09:32:25 DEBUG [akka://gabbler-service-system/user/gabbler-service/Heiko] - restarting
09:32:25 DEBUG [akka://gabbler-service-system/user/gabbler-service/Heiko] - restarted

Obviously the Gabbler actor which was created on behalf of the initial GET request was still alive even though the client connection used by its completer has been terminated. This isn’t a big issue, because Akka is fault tolerant. Yet the default behavior of restarting a Gabbler actor in such a situation doesn’t make sense. Therefore the first think we change is the supervisorStrategy of the GabblerService:

override def supervisorStrategy: SupervisorStrategy =
  OneForOneStrategy() { case _ => SupervisorStrategy.Stop }

As you can see, we simply stop a Gabbler actor in any case, because there are no cases where it makes sense to restart or resume it.

But the essence of the issue still hasn’t been resolved: We should prevent the client connection from timing out during the long polling cycle. Instead we need to have a server side timeout which leads to completing the current request and the client sending another one.

A simple approach would just make use of the receiveTimeout of an actor and stop a Gabbler after a reasonable time. This would work in many cases, but what if there arrives a message after the Gabbler has been stopped, but before the client has reconnected and a new Gabbler has been created? This message would not be delivered to that particular gabbler.

Therefore we have to schedule a timeout ourselves every time a Gabbler actor enters a particular state. If this state is waiting for message, i.e. if the Gabbler has a completer, we’ll complete the request after the timeout with an empty list and transition into waiting. This gives the long polling client the chance to send a new GET request with a new completer.

Gabbler states

In both other states, i.e. in waiting and waiting for completer, the Gabbler is waiting for a completer. If this doesn’t arrive within the timeout period, we can safely assume the user has shut the browser window or some other severe reason prevents the client to send another long polling request, hence we can close the Gabbler actor.

Here’s the accordingly changed Gabbler code:

object Gabbler {

  private case class Timeout(id: Int)

  def props(timeout: FiniteDuration): Props =
    Props(new Gabbler(timeout))
}

final class Gabbler(timeoutDuration: FiniteDuration) extends Actor {

  import Gabbler._
  import GabblerService._
  import context.dispatcher

  def receive: Receive =
    waiting(scheduleTimeout(Timeout(0)))

  def waiting(timeout: Timeout): Receive = {
    case completer: Completer => context become waitingForMessage(completer, newTimeout(timeout))
    case message: Message     => context become waitingForCompleter(message :: Nil, timeout)
    case `timeout`            => context.stop(self)
  }

  def waitingForMessage(completer: Completer, timeout: Timeout): Receive = {
    case completer: Completer => context become waitingForMessage(completer, newTimeout(timeout))
    case message: Message     => completeAndWait(completer, message :: Nil, timeout)
    case `timeout`            => completeAndWait(completer, Nil, timeout)
  }

  def waitingForCompleter(messages: List[Message], timeout: Timeout): Receive = {
    case completer: Completer => completeAndWait(completer, messages, timeout)
    case message: Message     => context become waitingForCompleter(message :: messages, timeout)
    case `timeout`            => context.stop(self)
  }

  private def newTimeout(timeout: Timeout): Timeout =
    scheduleTimeout(timeout.copy(timeout.id + 1))

  private def scheduleTimeout(timeout: Timeout): Timeout = {
    context.system.scheduler.scheduleOnce(timeoutDuration, self, timeout)
    timeout
  }

  private def completeAndWait(completer: Completer, messages: List[Message], timeout: Timeout): Unit = {
    completer(messages)
    context become waiting(newTimeout(timeout))
  }
}

As you can see we have introduced a parameterized Timeout message. This is important to schedule new timeouts and ignore old ones. Of course we don’t hard code the timeout duration, but instead use a value from the configuration. In order to make it work with curl we need a value less than 20 seconds. For the sake of demonstration, let’s simply go for 5 seconds, because then we don’t have to wait too long if we want to watch what’s going on.

If we rerun the above usage example, curl doesn’t time out, but we receive an empty list after 5 seconds. Then we can safely send a message without getting any errors on the server side:

~$ curl -u Heiko:Heiko http://localhost:8080/api/messages
[]~$
~$ curl -u Roland:Roland -d '{ "text": "Akka rocks!", "username": "" }' -H "Content-Type: application/json" http://localhost:8080/api/messages
~$ curl -u Heiko:Heiko http://localhost:8080/api/messages[{
  "username": "Roland",
  "text": "Akka rocks!"
}]~$

If we send the message and then again ask for messages quickly enough, all within 5 seconds, we’ll even receive the message that has been sent. This is because after timing out in waiting for message, the Gabbler actor transitions into waiting. There it receives the message and transitions into waiting for completer. In this state timing out would mean stopping, therefore we have to send the completer quickly. In orther words: Even if the long polling connection hasn’t yet been reopened, no messages will get lost.

Conclusion

That’s it, we’re done! We have built Gabbler, a simple push-enabled chat application. While simple, Gabbler is robust and shows how easily we can build such applications with Scala, Akka, spray and AngularJS.

The complete source code is available on GitHub.

Gabbler, a reactive chat app – part 3

Gabbler, a reactive chat app – part 2

In my previous blog post we have built the client portion of Gabbler – a modern and reactive push-enabled chat application – as well as its server skeleton. For those of you who didn’t like the client stuff, here comes some good news: The client portion is already done and we’ll focus on the server in this and the upcoming blog posts. Today we will fill the missing pieces of the server to create a first fully functional, yet still simplistic, solution.

Gabblers are actors

It should be obvious, that we have to represent the connected users, which I will call gabblers from now on, in some way at runtime. I won’t use the term session here, because it might cause frowns or even worse, but at the end of the day we need a concept to keep track of these gabblers.

As we are already using Akka – what a lucky coincident, eh? – we will use actors to represent these gabblers. Actors fit quite naturally, because they have a lifecycle, i.e. can get created when a gabbler appears and get stopped somewhat later when the gabbler leaves. In addition, we can easily create millions of actors on commodity hardware, so chances are that we can sell our application to some social media startup.

All right, here comes the Gabbler actor:

class Gabbler extends Actor { … }

Long polling

Before we can define the Gabbler‘s behavior, we have to spend some time thinking about its responsibilities.

We want to provide server-side push via long polling. What does that mean? Well, if a gabbler sends a GET request (“Give me some messages, dude!”), we don’t necessarily send a response right away, but only if there are messages available. So in terms of spray-routing we won’t complete a GET request immediately, but essentially keep the respective RequestContext around for later. By the way, in this first step we are not concerned about timeouts and such, but we are looking for a simple solution.

If a client sends a POST request (“Folks, here is a message for you!”), we have to send it to all gabblers which then can use the stored RequestContexts to complete the outstanding long polling GET requests with this message.

So basically we could implement the Gabbler‘s behavior like this:

def receive: Receive = {
  case context: RequestContext =>
    if (messages.isEmpty)
      storedContext = Some(context)
    else
      context.complete(messages)
  case message: Message =>
    messages +:= message
    for (context <- storedContext) {
      context.complete(messages)
      messages = Nil
      storedCompleter = None
    }
}

As you can see we could send the RequestContext for a GET request to the respective Gabbler instance. It would either be stored for later or be used to complete the request immediately, if messages were already available.

A POST request would lead to a Message instance being sent to all Gabbler instances. In this behavior, such a message would first be stored and then all stored messags would be used to complete a former GET request, if a respective RequestContext was available.

Completing the abstraction

While this approach would work, it is really ugly, because the HTTP API leaks into the “business logic” via the RequestContext. Therefore spray-routing offers the produce directive, which produces – hence the name – a completion function A =&gt; Unit, where A is the type we want to get marshalled, and passes it into its inner route. We can then further pass along this harmless function and use it later to complete the request by simply applying it. We’ll look at produce when we turn our attention to GabblerService.

In our example we want the completion function to be a List[Message] =&gt; Unit. Therefore our Gabbler looks like this:

class Gabbler extends Actor {

  import GabblerService._

  var messages: List[Message] = Nil

  var storedCompleter: Option[Completer] = None

  def receive: Receive = {
    case completer: Completer =>
      if (messages.isEmpty)
        storedCompleter = Some(completer)
      else
        completer(messages)
    case message: Message =>
      messages +:= message
      for (completer <- storedCompleter) {
        completer(messages)
        messages = Nil
        storedCompleter = None
      }
  }
}

Notice that we are using the Completer type alias for List[Message] =&gt; Unit to make its purpose more expressive. When we receive such a completer, we either apply it to complete the request or store it for later. All the rest is like in the “ugly” approach above.

Producing completion functions

All that’s left to do is filling the missing pieces in GabblerService:

def apiRoute: Route =
// format: OFF
  authenticate(BasicAuth(UsernameEqualsPasswordAuthenticator, "Gabbler"))(user =>
    path("api" / "messages")(
      get(
        produce(instanceOf[List[Message]]){ completer => _ =>
          log.debug("User '{}' is asking for messages ...", user.username)
          gabblerFor(user.username) ! completer
        }
      ) ~
      post(
        entity(as[Message]) { message =>
          complete {
            log.debug("User '{}' has posted '{}'", user.username, message.text)
            val m = message.copy(username = user.username)
            context.children foreach (_ ! m)
            StatusCodes.NoContent
          }
        }
      )
    )
  )
// format: ON

def gabblerFor(username: String): ActorRef =
  context.child(username) getOrElse context.actorOf(Gabbler.props, username)

As you can see, we now use the produce directive when handling a GET request to create a completion function List[Message] =&gt; Unit, which we simply send to the Gabbler actor for the authenticated user. This is looked up amongst the children or, if not yet existing, created with the name of the authenticated user. Like for the entity directive, marshalling is a breeze and already working for us, because of the integration with spray-json and our Message companion object.

Of course we also have to fill the missing piece in the portion of the route handling POST request: We add the authenticated user to the unmarshalled message and send it to all Gabblers. That’s all.

Gabble away

We could use our beautiful HTML 5 UI and I encourage you to give it a try yourself, but for this blog post we will simply use good old curl to see our solution in action.

After running our GabblerServiceApp, we enter into one terminal session:

curl -u Heiko:Heiko http://localhost:8080/api/messages

We will see … nothing, which is expected, because we are long polling for messages and nobody is sending one. Let’s change that! In another terminal session we enter:

curl -u Roland:Roland -d '{ "text": "Akka rocks!", "username": "" }' -H "Content-Type: application/json" http://localhost:8080/api/messages

Which will immediately return and also make the following appear in the first terminal session:

[{
  "username": "Roland",
  "text": "Akka rocks!"
}]

No big surprise, but nice to see that it is working.

Conclusion

We have completed the server skeleton and implemented a first and simple solution for server-side push via long polling. In the next blog posts we will improve this solution, in particular make it easier to understand and also more robust.

The complete source code is available on GitHub, the current state under the tag step-02.

Gabbler, a reactive chat app – part 2

Gabbler, a reactive chat app – part 1

In my previous blog post I introduced the new Akka I/O and already kind of demonstrated that you can use it – in combination with spray – to build HTTP servers with Akka. This post is the first in a series which build on these core concepts and dive into the domain of modern and reactive web applications: We’ll create Gabbler, a simple push-enabled chat application using the latest and greatest tools like Scala, Akka, spray and AngularJS.

Setting up the project

As usual, we are going to use the amazing sbt build tool. I have heard rumors that some don’t like it, but anyway, here’s some interesting stuff from our build.sbt:

scalaVersion := "2.10.2"

// TODO Remove as soon as spray-json is on Maven Cental
resolvers += "spray repo" at "http://repo.spray.io"

// TODO Remove as soon as spray is final and on Maven Cental
resolvers += "spray nightlies repo" at "http://nightlies.spray.io"

libraryDependencies ++= Dependencies.gabbler

And as I try to stick to the rules Josh laid down in his Effective sbt talk at Scala Days 2013 here is our project/Dependencies.scala (rule #2: track dependencies in one place):

object Library {

  // Versions
  val akkaVersion = "2.2.0-RC2"
  val logbackVersion = "1.0.13"
  val sprayVersion = "1.2-20130628" // Compatible with Akka 2.2.0-RC2
  val sprayJsonVersion = "1.2.5"

  // Libraries
  val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
  val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
  val logbackClassic = "ch.qos.logback" % "logback-classic" % logbackVersion
  val sprayCan = "io.spray" % "spray-can" % sprayVersion
  val sprayRouting = "io.spray" % "spray-routing" % sprayVersion
  val sprayJson = "io.spray" %% "spray-json" % sprayJsonVersion
}

object Dependencies {

  import Library._

  val gabbler = List(
    akkaActor,
    akkaSlf4j,
    logbackClassic,
    sprayCan,
    sprayRouting,
    sprayJson
  )
}

As you can see, we are still on the latest released version 2.10.2 of Scala, although I’d love to go for Scala 2.11 milestone whatsoever. But we all know that it can be a little tricky to find our library dependencies published against these latest development versions. As in my previous post we are using Akka 2.2 and spray 1.2.

The user interface

Don’t expect anything fancy or stylish here! I’m not a web designer and never will be. Luckily there’s Twitter Bootstrap which prevents me from creating a complete mess. Enough talk, here’s the Gabbler UI:

Gabbler

As you can see, we are using the Bootstrap starter template. We add two columns: the left contains a textarea for new messages and a button to send them off, the right hosts the list of received messages.

We have to logon first. For sake of simplicity we can use any username-password combination with equal username and password. Then we can send messages which are pushed to all connected clients.

The AngularJS client

To build the client, we are using AngularJS, which seems to be one of the most promising JavaScript frameworks for reactive web applications out there. We won’t cover the details here, but just point out some of the most interesting aspects.

Let’s first take a look at the view portion of the client, which is a single HTML 5 page:

<!doctype html>
<html data-ng-app="gabbler">

…

<body data-ng-controller="GabblerCtrl">

…

<div class="container">
  <div class="row">

    <!--### The form for a new message ###-->
    <div class="span6">
      <h3>Gabble away</h3>
      <form name="form" novalidate>
        <fieldset>
          <div>
            <textarea id="text-input" rows="3" placeholder="Enter your message ..."
                      data-ng-model="message.text" required></textarea>
          </div>
          <div>
            <button type="submit" class="btn btn-primary" data-ng-click="sendMessage()"
                    data-ng-disabled="form.$invalid">Gabble away</button>
          </div>
        </fieldset>
      </form>
    </div>

    <!--### The list of messages ###-->
    <div class="span6 messages">
      <h3 data-ng-init="getMessages()">Gibble gabble</h3>
      <div data-ng-repeat="msg in messages">
        <div class="username">{{ msg.username }}</div>
        <div class="text">{{ msg.text }}</div>
      </div>
    </div>

  </div>
</div>

…

</body>
</html>

As you can see, this is a completely logic-free HTML 5 page which can be opened in any browser. All interactivity comes through AngularJS data-ng- directives, which usually show up as attributes of HTML elements, and AngularJS expressions, embedded in {{ }}, which are evaluated kind of like JavaScript expressions.

The controller part of the client, which gets connected to the view through the data-ng-controller directive of the body element, is a JavaScript object:

function GabblerCtrl($scope, Message) {

  $scope.messages = [];

  $scope.getMessages = function() {
    var messages = Message.query(function() {
      $scope.messages = messages.concat($scope.messages);
      $scope.getMessages();
    });
  }

  $scope.message = new Message({ "username": "" });

  $scope.sendMessage = function() {
    $scope.message.$save();
    $scope.message = new Message({ "username": "" });
  };
}

Our GabblerCtrl defines the $scope.getMessages function that queries the server for messages asynchronously and adds any new messages sent from the server to the messages array. Notice that the callback for the query calls $scope.getMessages again, hence the client is polling the server. We don’t use WebSocket here by purpose, because it will be illustrative to use Akka actors to implement long polling in a very elegant fashion on the server side. The controller also defines the $scope.sendMessage function which sends a new message to the server and reinitializes – i.e. empties – the local message which is stored in the $scope.message variable.

The model part of the client is made up from the $scope.messages array and the $scope.message object, which are synchronized with the view through AngularJS’s two-way data binding. Therefore the view gets updated anytime new messages are sent from the server, resulting in new entries in the list of messages in the right column. Also, when the user clicks the “Gabble away” button, reinitializing the $scope.message variable leads to blanking out the textarea for new messages in the left column.

The server skeleton

With the client in place, let’s start looking at the RESTful HTTP server. In this post we just cover a server skeleton with incomplete implementations for accetping GET and POST requests.

In my previous post I have shown how to build a HTTP server with Akka I/O and spray-can. This time we add another spray module – spray-routing – which provides a high-level, very flexible routing DSL for elegantly defining RESTful web services.

First we have to create an application:

object GabblerServiceApp extends App {

  val system = ActorSystem("gabbler-service-system")
  val interface = GabblerSettings(system).interface
  val port = GabblerSettings(system).port
  system.actorOf(GabblerService.props(interface, port), "gabbler-service")

  readLine(s"Hit ENTER to exit ...$newLine")
  system.shutdown()
}

As usual we create an ActorSystem and shut it down at the end, after hitting “ENTER”, which is a feature we probably don’t want to add to a mission critical high-available production system. We also get some configuration settings from the GabblerSettings Akka extension which we use to create the GabblerService top-level actor:

class GabblerService(interface: String, port: Int) extends HttpServiceActor with ActorLogging {

  import GabblerService._
  import SprayJsonSupport._
  import context.dispatcher

  IO(Http)(context.system) ! Http.Bind(self, interface, port)

  …

As you can see, GabblerService extends HttpServiceActor from spray-routing which makes the routing DSL available. When the GabblerService actor is created, we send a Http.Bind message to the I/O manager for HTTP to register itself as listener for HTTP connections.

Other than in my previous post we don’t handle the various HTTP messages natively, but instead use the routing DSL to define the actor’s behavior. The core concepts of the routing DSL are routes and directives.

Routes, which are simply functions of type RequestContext =&gt; Unit and can be nested or composed, are used to either complete a request or recejct or ignore it. Here is a very simple route that completes a request:

context => context.complete("Hello, world!")

Directives are small building blocks for arbitrarily complex routes which can have inner routes, transform or filter the incoming RequestContext or extract values form it. spray-routing already provides many useful directives, e.g. for completing a request, matching against the HTTP verb or extracting path elements:

path("order" / IntNumber)(id =>
  get(
    complete(
      "Received GET request for order " + id
    )
  ) ~
  put(
    complete(
      "Received PUT request for order " + id
    )
  )
)

Using this routing DSL we define the behavior of the GabblerService actor:

  …

  override def receive: Receive =
    runRoute(apiRoute ~ staticRoute)

  def apiRoute: Route =
  // format: OFF
    authenticate(BasicAuth(UsernameEqualsPasswordAuthenticator, "Gabbler"))(user =>
      path("api" / "messages")(
        get(context =>
          log.debug("User '{}' is asking for messages ...", user.username)
          // TODO Complete this user's request later, when messages are available!
        ) ~
        post(
          entity(as[Message]) { message =>
            complete {
              log.debug("User '{}' has posted '{}'", user.username, message.text)
              // TODO Dispatch message to all users!
              StatusCodes.NoContent
            }
          }
        )
      )
    )
  // format: ON

  def staticRoute: Route =
    path("")(getFromResource("web/index.html")) ~ getFromResourceDirectory("web")
}

As you can see we define the receive method in terms of the runRoute method which gets a route composed of apiRoute and staticRoute.

Let’s first look at the pretty straightforward staticRoute, which is a composite of two simple routes. The first is made up from the path direcitve matching the empty path and the getFromResource directive which completes a GET request with the given resource, which is web/index.html in our case. The second route is constructed with the getFromResourceDirectory direcitve and completes all GET requests with resources from the web directory. Essentially, staticRoute is used to serve Gabbler‘s one and only HTML file as well as all the CSS, JavaScript, image, etc. resources.

Now let’s take a look at apiRoute. It extracts the authenticated user and then matches requests with path api/messages. Within that, GET and POST requests are accepted.

We use basic authentication with a simple scheme: We accept any username-password combination with equal username and password:

object UsernameEqualsPasswordAuthenticator extends UserPassAuthenticator[BasicUserContext] {

  override def apply(userPass: Option[UserPass]): Future[Option[BasicUserContext]] = {
    val basicUserContext =
      userPass flatMap {
        case UserPass(username, password) if username == password => Some(BasicUserContext(username))
        case _ => None
      }
    Promise.successful(basicUserContext).future
  }
}

On receiving a GET request we simply log that the authenticated user is asking for messages. Notice that we dont’ complete the request here in order to support long polling, which will be introduced in the next post.

On receiving a POST request we extract the body into an instance of the Message case class by using the entity directive. This is supported by spray-json integration with the following Message companions:

object Message extends DefaultJsonProtocol {
  implicit val format = jsonFormat2(apply)
}

case class Message(username: String, text: String)

As you can see, marshalling and unmarshalling of case classes with spray is a breeze.

Within the entity directive we complete the request with status code 204 “No Content” after logging that the authenticated user has posted a message. Again, we save an essential aspect – sending the message to all connected users – for the next post.

Conclusion

We have introduced Gabbler, a modern and reactive push-enabled chat application. Actually we have only shown Gabbler‘s client portion, built with AngularJS and a server skeleton built with Scala, Akka I/O and spray. In the next posts we will complete the server step by step towards a fully functional solution.

The complete source code is available on GitHub, the current state under the tag step-01.

Gabbler, a reactive chat app – part 1

Introduction to Akka I/O

The recently released Akka 2.2.0-RC1 introduces the new akka.io package in the akka-actor module. According to the documentation, “the guiding design goal for this I/O implementation was to reach extreme scalability, make no compromises in providing an API correctly matching the underlying transport mechanism and to be fully event-driven, non-blocking and asynchronous.”

For some of you this completely actor-based I/O implementation might look familiar, because it has been developed as a joint effort by the spray and Akka teams, based on the “old” spray.io module.

In this blog post we will introduce the new Akka I/O by some very simple networking examples, namely a TCP and a HTTP server. In follow-up posts I will then show how Akka I/O and spray can be used to build a modern and reactive single-page web application.

Setting up the project

All right, to get stated we need an sbt project with the following dependencies:

  • Scala 2.10.x
  • akka-actor 2.2.0-RC2: provides Akka I/O
  • spray-can 1.2-20130628: only needed for HTTP

For convenience we also add some logging support, hence we need the following sbt settings in build.sbt:

scalaVersion := "2.10.2"

// TODO Remove as soon as spray is final and on Maven Cental
resolvers += "spray repo" at "http://nightlies.spray.io"

libraryDependencies ++= Dependencies.demoAkka

And this nice little class in project/Dependendies.scala:

object Library {

  // Versions
  val akkaVersion = "2.2.0-RC2"
  val logbackVersion = "1.0.13"
  val sprayVersion = "1.2-20130628" // Compatible with Akka 2.2.0-RC2

  // Libraries
  val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
  val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
  val logbackClassic = "ch.qos.logback" % "logback-classic" % logbackVersion
  val sprayCan = "io.spray" % "spray-can" % sprayVersion
}

object Dependencies {

  import Library._

  val demoAkka = List(akkaActor, akkaSlf4j, logbackClassic, sprayCan)
}

TCP server

As we want to write a server, we need something we can run, i.e. an App:

object EchoServiceApp extends App {

  val system = ActorSystem("echo-service-system")
  val endpoint = new InetSocketAddress("localhost", 11111)
  system.actorOf(EchoService.props(endpoint), "echo-service")

  readLine(s"Hit ENTER to exit ...${System.getProperty("line.separator")}")
  system.shutdown()
}

We call it EchoServiceApp, because all this very simple server will be able to do is echoing back whatever we tell. As you can see, we create an actor system and an endpoint on localhost and port 11111.

Then we create the yet to be defined EchoService actor. Please notice, that ~~passing constructor arguments to an actor has changed in Akka 2.2; while the old style is still supported, it is now deprecated in order to avoid closing over non-serializable state~~ we are following a recommended implementation pattern and are using a props factory method for Props in the also to be defined companion object to avoid closing over non-serializable state: If you pass a closure to Props from within an actor, this gets included, which refers to an Actor, which is not serializable.

Finally we make sure we can shutdown our server via the ENTER key.

I/O manager and listener

The new Akka I/O implementation abstracts over transport protocols via so called “drivers” and introduces the concept of an “I/O manager” as the API for a particular driver. Let’s create the EchoService companion object defining the Props factory as well as the EchoService actor and create an I/O manager for TCP in its constructor:

object EchoService {
  def props(endpoint: InetSocketAddress): Props =
    Props(new EchoService(endpoint))
}

class EchoService(endpoint: InetSocketAddress) extends Actor with ActorLogging {
  IO(Tcp) ! Tcp.Bind(self, endpoint)
  …
}

We send a Tcp.Bind command to the I/O manager for TCP in order to register self – i.e. this very EchoService actor instance – as so called “listener” for TCP connections. Of course we need to listen for TCP connections now:

override def receive: Receive = {
  case Tcp.Connected(remote, _) =>
    log.debug("Remote address {} connected", remote)
    sender ! Tcp.Register(context.actorOf(EchoConnectionHandler.props(remote, sender)))
}

Listening to TCP connections means handling messages of type Tcp.Connected. All we do here is log a nice message at DEBUG level and then register a new instance of the yet to be defined EchoConnectionHandler actor as so called “handler” for the connection.

Hence we create a new handler actor for each connection. As actors are really cheap, this shouldn’t be an issue at all. But if you prefer, you can register the listener itself as handler.

As you can see, we pass the sender, i.e. the connection, to the EchoConnectionHandler. We’ll explain the reason soon …

Connection handler

Now let’s take a look at the final missing piece:

object EchoConnectionHandler {
  def props(remote: InetSocketAddress, connection: ActorRef): Props =
    Props(new EchoConnectionHandler(remote, connection))
}

class EchoConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor with ActorLogging {

  // We need to know when the connection dies without sending a `Tcp.ConnectionClosed`
  context.watch(connection)

  def receive: Receive = {
    case Tcp.Received(data) =>
      val text = data.utf8String.trim
      log.debug("Received '{}' from remote address {}", text, remote)
      text match {
        case "close" => context.stop(self)
        case _       => sender ! Tcp.Write(data)
      }
    case _: Tcp.ConnectionClosed =>
      log.debug("Stopping, because connection for remote address {} closed", remote)
      context.stop(self)
    case Terminated(`connection`) =>
      log.debug("Stopping, because connection for remote address {} died", remote)
      context.stop(self)
  }
}

As you can see, the connection handler receives messages of type Tcp.Received which contain the payload as a ByteString which – according to the documentation – is an “efficient, immutable alternative to the traditional byte containers used for I/O on the JVM, such as Array[Byte] and ByteBuffer“.

Here we simply reply with a Tcp.Write containing the same payload or – in case the payload equals “close” – stop the connection handler.

In case a Tcp.ConnectionClosed message is received, we also stop the connection handler. And finally, we listen to Terminated messages for the underlying connection and stop ourselves, too, in this case. Now you know why we had to pass along the connection from the listener.

TCP server in action

Now that we have all the necessary pieces in place, let’s run the TCP server and connect via telnet. In one sbt session we run the EchoServerApp and in another terminal session we enter the following:

tmp$ telnet 127.0.0.1 11111
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello, TCP
Hello, TCP
Woot, it workz!
Woot, it workz!
close
Connection closed by foreign host.
tmp$

As you can see, our TCP echo server works as expected. We can either close the connection via the “close command” or even shut down the server via the ENTER key.

HTTP server

Now that we know how to create a TCP server, let’s look at HTTP which is probably ways more interesting for most of us. Actually there is not so much to say, because Akka I/O is designed in a way which pretty much abstracts away the transport protocol, which is pretty awesome.

The EchoServiceApp for HTTP looks exactly the same except for the constructor arguments of the HTTP listener. Hence let’s dive into the HTTP listener immediately:

class EchoService(host: String, port: Int) extends Actor with ActorLogging {

  import context.system

  IO(Http) ! Http.Bind(self, host, port)

  override def receive: Receive = {
    case Http.Connected(remote, _) =>
      log.debug("Remote address {} connected", remote)
      sender ! Http.Register(context.actorOf(EchoConnectionHandler.props(remote, sender)))
  }
}

As you can see, all we had to change here are the message types: Http.Bind and Http.Connected instead of the TCP likes.

And also the HTTP connection handler looks almost like the one for TCP:

class EchoConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor with ActorLogging {

  // We need to know when the connection dies without sending a `Tcp.ConnectionClosed`
  context.watch(connection)

  def receive: Receive = {
    case HttpRequest(GET, uri, _, _, _) =>
      sender ! HttpResponse(entity = uri.path.toString)
    case _: Tcp.ConnectionClosed =>
      log.debug("Stopping, because connection for remote address {} closed", remote)
      context.stop(self)
    case Terminated(`connection`) =>
      log.debug("Stopping, because connection for remote address {} died", remote)
      context.stop(self)
  }
}

As you can see, the HTTP connection handler receives and GET messages and replies with the path component of the URI. The other received messages are the same like for the TCP connection handler.

HTTP server in action

All right, let’s finally take a look at the HTTP server. We run it in sbt and in another terminal session we enter the following:

tmp$ curl localhost:8080/Akka-IO-rocks
/Akka-IO-rockstmp$
tmp$

Conclusion

We have introduced the new Akka I/O by two very simple networking examples. Hopefully it has become obvious, that it is pretty easy to create an actor-based and hence completely asynchronous I/O server, most remarkably almost independent of the transport protocol.

The full source code is available on GitHub.

Introduction to Akka I/O