Introduction to Akka I/O

The recently released Akka 2.2.0-RC1 introduces the new akka.io package in the akka-actor module. According to the documentation, “the guiding design goal for this I/O implementation was to reach extreme scalability, make no compromises in providing an API correctly matching the underlying transport mechanism and to be fully event-driven, non-blocking and asynchronous.”

For some of you this completely actor-based I/O implementation might look familiar, because it has been developed as a joint effort by the spray and Akka teams, based on the “old” spray.io module.

In this blog post we will introduce the new Akka I/O by some very simple networking examples, namely a TCP and a HTTP server. In follow-up posts I will then show how Akka I/O and spray can be used to build a modern and reactive single-page web application.

Setting up the project

All right, to get stated we need an sbt project with the following dependencies:

  • Scala 2.10.x
  • akka-actor 2.2.0-RC2: provides Akka I/O
  • spray-can 1.2-20130628: only needed for HTTP

For convenience we also add some logging support, hence we need the following sbt settings in build.sbt:

scalaVersion := "2.10.2"

// TODO Remove as soon as spray is final and on Maven Cental
resolvers += "spray repo" at "http://nightlies.spray.io"

libraryDependencies ++= Dependencies.demoAkka

And this nice little class in project/Dependendies.scala:

object Library {

  // Versions
  val akkaVersion = "2.2.0-RC2"
  val logbackVersion = "1.0.13"
  val sprayVersion = "1.2-20130628" // Compatible with Akka 2.2.0-RC2

  // Libraries
  val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
  val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
  val logbackClassic = "ch.qos.logback" % "logback-classic" % logbackVersion
  val sprayCan = "io.spray" % "spray-can" % sprayVersion
}

object Dependencies {

  import Library._

  val demoAkka = List(akkaActor, akkaSlf4j, logbackClassic, sprayCan)
}

TCP server

As we want to write a server, we need something we can run, i.e. an App:

object EchoServiceApp extends App {

  val system = ActorSystem("echo-service-system")
  val endpoint = new InetSocketAddress("localhost", 11111)
  system.actorOf(EchoService.props(endpoint), "echo-service")

  readLine(s"Hit ENTER to exit ...${System.getProperty("line.separator")}")
  system.shutdown()
}

We call it EchoServiceApp, because all this very simple server will be able to do is echoing back whatever we tell. As you can see, we create an actor system and an endpoint on localhost and port 11111.

Then we create the yet to be defined EchoService actor. Please notice, that ~~passing constructor arguments to an actor has changed in Akka 2.2; while the old style is still supported, it is now deprecated in order to avoid closing over non-serializable state~~ we are following a recommended implementation pattern and are using a props factory method for Props in the also to be defined companion object to avoid closing over non-serializable state: If you pass a closure to Props from within an actor, this gets included, which refers to an Actor, which is not serializable.

Finally we make sure we can shutdown our server via the ENTER key.

I/O manager and listener

The new Akka I/O implementation abstracts over transport protocols via so called “drivers” and introduces the concept of an “I/O manager” as the API for a particular driver. Let’s create the EchoService companion object defining the Props factory as well as the EchoService actor and create an I/O manager for TCP in its constructor:

object EchoService {
  def props(endpoint: InetSocketAddress): Props =
    Props(new EchoService(endpoint))
}

class EchoService(endpoint: InetSocketAddress) extends Actor with ActorLogging {
  IO(Tcp) ! Tcp.Bind(self, endpoint)
  …
}

We send a Tcp.Bind command to the I/O manager for TCP in order to register self – i.e. this very EchoService actor instance – as so called “listener” for TCP connections. Of course we need to listen for TCP connections now:

override def receive: Receive = {
  case Tcp.Connected(remote, _) =>
    log.debug("Remote address {} connected", remote)
    sender ! Tcp.Register(context.actorOf(EchoConnectionHandler.props(remote, sender)))
}

Listening to TCP connections means handling messages of type Tcp.Connected. All we do here is log a nice message at DEBUG level and then register a new instance of the yet to be defined EchoConnectionHandler actor as so called “handler” for the connection.

Hence we create a new handler actor for each connection. As actors are really cheap, this shouldn’t be an issue at all. But if you prefer, you can register the listener itself as handler.

As you can see, we pass the sender, i.e. the connection, to the EchoConnectionHandler. We’ll explain the reason soon …

Connection handler

Now let’s take a look at the final missing piece:

object EchoConnectionHandler {
  def props(remote: InetSocketAddress, connection: ActorRef): Props =
    Props(new EchoConnectionHandler(remote, connection))
}

class EchoConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor with ActorLogging {

  // We need to know when the connection dies without sending a `Tcp.ConnectionClosed`
  context.watch(connection)

  def receive: Receive = {
    case Tcp.Received(data) =>
      val text = data.utf8String.trim
      log.debug("Received '{}' from remote address {}", text, remote)
      text match {
        case "close" => context.stop(self)
        case _       => sender ! Tcp.Write(data)
      }
    case _: Tcp.ConnectionClosed =>
      log.debug("Stopping, because connection for remote address {} closed", remote)
      context.stop(self)
    case Terminated(`connection`) =>
      log.debug("Stopping, because connection for remote address {} died", remote)
      context.stop(self)
  }
}

As you can see, the connection handler receives messages of type Tcp.Received which contain the payload as a ByteString which – according to the documentation – is an “efficient, immutable alternative to the traditional byte containers used for I/O on the JVM, such as Array[Byte] and ByteBuffer“.

Here we simply reply with a Tcp.Write containing the same payload or – in case the payload equals “close” – stop the connection handler.

In case a Tcp.ConnectionClosed message is received, we also stop the connection handler. And finally, we listen to Terminated messages for the underlying connection and stop ourselves, too, in this case. Now you know why we had to pass along the connection from the listener.

TCP server in action

Now that we have all the necessary pieces in place, let’s run the TCP server and connect via telnet. In one sbt session we run the EchoServerApp and in another terminal session we enter the following:

tmp$ telnet 127.0.0.1 11111
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello, TCP
Hello, TCP
Woot, it workz!
Woot, it workz!
close
Connection closed by foreign host.
tmp$

As you can see, our TCP echo server works as expected. We can either close the connection via the “close command” or even shut down the server via the ENTER key.

HTTP server

Now that we know how to create a TCP server, let’s look at HTTP which is probably ways more interesting for most of us. Actually there is not so much to say, because Akka I/O is designed in a way which pretty much abstracts away the transport protocol, which is pretty awesome.

The EchoServiceApp for HTTP looks exactly the same except for the constructor arguments of the HTTP listener. Hence let’s dive into the HTTP listener immediately:

class EchoService(host: String, port: Int) extends Actor with ActorLogging {

  import context.system

  IO(Http) ! Http.Bind(self, host, port)

  override def receive: Receive = {
    case Http.Connected(remote, _) =>
      log.debug("Remote address {} connected", remote)
      sender ! Http.Register(context.actorOf(EchoConnectionHandler.props(remote, sender)))
  }
}

As you can see, all we had to change here are the message types: Http.Bind and Http.Connected instead of the TCP likes.

And also the HTTP connection handler looks almost like the one for TCP:

class EchoConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor with ActorLogging {

  // We need to know when the connection dies without sending a `Tcp.ConnectionClosed`
  context.watch(connection)

  def receive: Receive = {
    case HttpRequest(GET, uri, _, _, _) =>
      sender ! HttpResponse(entity = uri.path.toString)
    case _: Tcp.ConnectionClosed =>
      log.debug("Stopping, because connection for remote address {} closed", remote)
      context.stop(self)
    case Terminated(`connection`) =>
      log.debug("Stopping, because connection for remote address {} died", remote)
      context.stop(self)
  }
}

As you can see, the HTTP connection handler receives and GET messages and replies with the path component of the URI. The other received messages are the same like for the TCP connection handler.

HTTP server in action

All right, let’s finally take a look at the HTTP server. We run it in sbt and in another terminal session we enter the following:

tmp$ curl localhost:8080/Akka-IO-rocks
/Akka-IO-rockstmp$
tmp$

Conclusion

We have introduced the new Akka I/O by two very simple networking examples. Hopefully it has become obvious, that it is pretty easy to create an actor-based and hence completely asynchronous I/O server, most remarkably almost independent of the transport protocol.

The full source code is available on GitHub.

Advertisements
Introduction to Akka I/O

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s