Introduction to Akka I/O

The recently released Akka 2.2.0-RC1 introduces the new akka.io package in the akka-actor module. According to the documentation, “the guiding design goal for this I/O implementation was to reach extreme scalability, make no compromises in providing an API correctly matching the underlying transport mechanism and to be fully event-driven, non-blocking and asynchronous.”

For some of you this completely actor-based I/O implementation might look familiar, because it has been developed as a joint effort by the spray and Akka teams, based on the “old” spray.io module.

In this blog post we will introduce the new Akka I/O by some very simple networking examples, namely a TCP and a HTTP server. In follow-up posts I will then show how Akka I/O and spray can be used to build a modern and reactive single-page web application.

Setting up the project

All right, to get stated we need an sbt project with the following dependencies:

  • Scala 2.10.x
  • akka-actor 2.2.0-RC2: provides Akka I/O
  • spray-can 1.2-20130628: only needed for HTTP

For convenience we also add some logging support, hence we need the following sbt settings in build.sbt:

scalaVersion := "2.10.2"

// TODO Remove as soon as spray is final and on Maven Cental
resolvers += "spray repo" at "http://nightlies.spray.io"

libraryDependencies ++= Dependencies.demoAkka

And this nice little class in project/Dependendies.scala:

object Library {

  // Versions
  val akkaVersion = "2.2.0-RC2"
  val logbackVersion = "1.0.13"
  val sprayVersion = "1.2-20130628" // Compatible with Akka 2.2.0-RC2

  // Libraries
  val akkaActor = "com.typesafe.akka" %% "akka-actor" % akkaVersion
  val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
  val logbackClassic = "ch.qos.logback" % "logback-classic" % logbackVersion
  val sprayCan = "io.spray" % "spray-can" % sprayVersion
}

object Dependencies {

  import Library._

  val demoAkka = List(akkaActor, akkaSlf4j, logbackClassic, sprayCan)
}

TCP server

As we want to write a server, we need something we can run, i.e. an App:

object EchoServiceApp extends App {

  val system = ActorSystem("echo-service-system")
  val endpoint = new InetSocketAddress("localhost", 11111)
  system.actorOf(EchoService.props(endpoint), "echo-service")

  readLine(s"Hit ENTER to exit ...${System.getProperty("line.separator")}")
  system.shutdown()
}

We call it EchoServiceApp, because all this very simple server will be able to do is echoing back whatever we tell. As you can see, we create an actor system and an endpoint on localhost and port 11111.

Then we create the yet to be defined EchoService actor. Please notice, that ~~passing constructor arguments to an actor has changed in Akka 2.2; while the old style is still supported, it is now deprecated in order to avoid closing over non-serializable state~~ we are following a recommended implementation pattern and are using a props factory method for Props in the also to be defined companion object to avoid closing over non-serializable state: If you pass a closure to Props from within an actor, this gets included, which refers to an Actor, which is not serializable.

Finally we make sure we can shutdown our server via the ENTER key.

I/O manager and listener

The new Akka I/O implementation abstracts over transport protocols via so called “drivers” and introduces the concept of an “I/O manager” as the API for a particular driver. Let’s create the EchoService companion object defining the Props factory as well as the EchoService actor and create an I/O manager for TCP in its constructor:

object EchoService {
  def props(endpoint: InetSocketAddress): Props =
    Props(new EchoService(endpoint))
}

class EchoService(endpoint: InetSocketAddress) extends Actor with ActorLogging {
  IO(Tcp) ! Tcp.Bind(self, endpoint)
  …
}

We send a Tcp.Bind command to the I/O manager for TCP in order to register self – i.e. this very EchoService actor instance – as so called “listener” for TCP connections. Of course we need to listen for TCP connections now:

override def receive: Receive = {
  case Tcp.Connected(remote, _) =>
    log.debug("Remote address {} connected", remote)
    sender ! Tcp.Register(context.actorOf(EchoConnectionHandler.props(remote, sender)))
}

Listening to TCP connections means handling messages of type Tcp.Connected. All we do here is log a nice message at DEBUG level and then register a new instance of the yet to be defined EchoConnectionHandler actor as so called “handler” for the connection.

Hence we create a new handler actor for each connection. As actors are really cheap, this shouldn’t be an issue at all. But if you prefer, you can register the listener itself as handler.

As you can see, we pass the sender, i.e. the connection, to the EchoConnectionHandler. We’ll explain the reason soon …

Connection handler

Now let’s take a look at the final missing piece:

object EchoConnectionHandler {
  def props(remote: InetSocketAddress, connection: ActorRef): Props =
    Props(new EchoConnectionHandler(remote, connection))
}

class EchoConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor with ActorLogging {

  // We need to know when the connection dies without sending a `Tcp.ConnectionClosed`
  context.watch(connection)

  def receive: Receive = {
    case Tcp.Received(data) =>
      val text = data.utf8String.trim
      log.debug("Received '{}' from remote address {}", text, remote)
      text match {
        case "close" => context.stop(self)
        case _       => sender ! Tcp.Write(data)
      }
    case _: Tcp.ConnectionClosed =>
      log.debug("Stopping, because connection for remote address {} closed", remote)
      context.stop(self)
    case Terminated(`connection`) =>
      log.debug("Stopping, because connection for remote address {} died", remote)
      context.stop(self)
  }
}

As you can see, the connection handler receives messages of type Tcp.Received which contain the payload as a ByteString which – according to the documentation – is an “efficient, immutable alternative to the traditional byte containers used for I/O on the JVM, such as Array[Byte] and ByteBuffer“.

Here we simply reply with a Tcp.Write containing the same payload or – in case the payload equals “close” – stop the connection handler.

In case a Tcp.ConnectionClosed message is received, we also stop the connection handler. And finally, we listen to Terminated messages for the underlying connection and stop ourselves, too, in this case. Now you know why we had to pass along the connection from the listener.

TCP server in action

Now that we have all the necessary pieces in place, let’s run the TCP server and connect via telnet. In one sbt session we run the EchoServerApp and in another terminal session we enter the following:

tmp$ telnet 127.0.0.1 11111
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello, TCP
Hello, TCP
Woot, it workz!
Woot, it workz!
close
Connection closed by foreign host.
tmp$

As you can see, our TCP echo server works as expected. We can either close the connection via the “close command” or even shut down the server via the ENTER key.

HTTP server

Now that we know how to create a TCP server, let’s look at HTTP which is probably ways more interesting for most of us. Actually there is not so much to say, because Akka I/O is designed in a way which pretty much abstracts away the transport protocol, which is pretty awesome.

The EchoServiceApp for HTTP looks exactly the same except for the constructor arguments of the HTTP listener. Hence let’s dive into the HTTP listener immediately:

class EchoService(host: String, port: Int) extends Actor with ActorLogging {

  import context.system

  IO(Http) ! Http.Bind(self, host, port)

  override def receive: Receive = {
    case Http.Connected(remote, _) =>
      log.debug("Remote address {} connected", remote)
      sender ! Http.Register(context.actorOf(EchoConnectionHandler.props(remote, sender)))
  }
}

As you can see, all we had to change here are the message types: Http.Bind and Http.Connected instead of the TCP likes.

And also the HTTP connection handler looks almost like the one for TCP:

class EchoConnectionHandler(remote: InetSocketAddress, connection: ActorRef) extends Actor with ActorLogging {

  // We need to know when the connection dies without sending a `Tcp.ConnectionClosed`
  context.watch(connection)

  def receive: Receive = {
    case HttpRequest(GET, uri, _, _, _) =>
      sender ! HttpResponse(entity = uri.path.toString)
    case _: Tcp.ConnectionClosed =>
      log.debug("Stopping, because connection for remote address {} closed", remote)
      context.stop(self)
    case Terminated(`connection`) =>
      log.debug("Stopping, because connection for remote address {} died", remote)
      context.stop(self)
  }
}

As you can see, the HTTP connection handler receives and GET messages and replies with the path component of the URI. The other received messages are the same like for the TCP connection handler.

HTTP server in action

All right, let’s finally take a look at the HTTP server. We run it in sbt and in another terminal session we enter the following:

tmp$ curl localhost:8080/Akka-IO-rocks
/Akka-IO-rockstmp$
tmp$

Conclusion

We have introduced the new Akka I/O by two very simple networking examples. Hopefully it has become obvious, that it is pretty easy to create an actor-based and hence completely asynchronous I/O server, most remarkably almost independent of the transport protocol.

The full source code is available on GitHub.

Introduction to Akka I/O

Implicits unchained – type-safe equality – part 3

In the last post we have developed a type-wise balanced type-safe equality operation:

scala> import name.heikoseeberger.demoequality.TypeWiseBalancedEquality._
import name.heikoseeberger.demoequality.TypeWiseBalancedEquality._

scala> Seq(1, 2, 3) === List(1, 2, 3)
res0: Boolean = true

scala> List(1, 2, 3) === Seq(1, 2, 3)
res1: Boolean = true

scala> 123 === "a"
<console>:11: error: TypeWiseBalancedEquality requires Int and String to be in a subtype relationship!

Before moving on, let’s discuss two minor issues related to the design and the implementation of the current state of the solution.

First, one can easily create new type class instances for Equality that “override” the default which is based on natural equality, i.e. delegates to ==:

scala> import name.heikoseeberger.demoequality.TypeWiseBalancedEquality._
import name.heikoseeberger.demoequality.TypeWiseBalancedEquality._

scala> implicit val weirdIntEquality = new Equality[Int, Int] {
     |   def areEqual(n: Int, m: Int) = n != m
     | }
weirdIntEquality: name.heikoseeberger.demoequality.TypeWiseBalancedEquality.Equality[Int,Int] = $anon$1@6a5f7445

scala> 1 === 1
res0: Boolean = false

Due to the rules of implicit resolution, all locally defined or imported type class instances override the ones defined in the implicit scope (in the companion object of the type class instance). Therefore it’s easy to override the default.

There are many situations where we want this behavior, but I think that in our use case intuition commands that === – which almost looks like == – behaves like ==. Therefore we need a way to prevent others from creating Equality type class instances. This can easily be achieved by sealing Equality:

sealed trait Equality[L, R] { … }

The second issue is related to performance. Currently the Equality type class instances are provided by the polymorphic rightSubtypeOfLeftEquality and leftSubtypeOfRightEquality methods, which create new instances every time they get called. Therefore we add the AnyEquality singleton object and simply use it as the return value for rightSubtypeOfLeftEquality and leftSubtypeOfRightEquality:

implicit def rightSubtypeOfLeftEquality[L, R <: L]: Equality[L, R] =
  AnyEquality.asInstanceOf[Equality[L, R]]

implicit def leftSubtypeOfRightEquality[R, L <: R]: Equality[L, R] =
  AnyEquality.asInstanceOf[Equality[L, R]]

private object AnyEquality extends Equality[Any, Any] {
  override def areEqual(left: Any, right: Any): Boolean = left == right
}

The type cast looks ugly, but thanks to type erasure it doesn’t cause any problems.

All right, that’s our final solution for type-wise balanced type-safe equality. Here is the complete code:

object TypeWiseBalancedEquality {
  implicit class Equal[L](val left: L) extends AnyVal {
    def ===[R](right: R)(implicit equality: Equality[L, R]): Boolean =
      equality.areEqual(left, right)
  }
  @implicitNotFound("TypeWiseBalancedEquality requires ${L} and ${R} to be in a subtype relationship!")
  sealed trait Equality[L, R] {
    def areEqual(left: L, right: R): Boolean
  }
  object Equality extends LowPriorityEqualityImplicits {
    implicit def rightSubtypeOfLeftEquality[L, R <: L]: Equality[L, R] =
      AnyEquality.asInstanceOf[Equality[L, R]]
  }
  trait LowPriorityEqualityImplicits {
    implicit def leftSubtypeOfRightEquality[R, L <: R]: Equality[L, R] =
      AnyEquality.asInstanceOf[Equality[L, R]]
  }
  private object AnyEquality extends Equality[Any, Any] {
    override def areEqual(left: Any, right: Any): Boolean =
      left == right
  }
}

View-wise balanced type-safe equality

While this type-wise balanced type-safe equality operation works in most cases, there are still some glitches:

scala> 1 === 1L
<console>:11: error: TypeWiseBalancedEquality requires Int and Long to be in a subtype relationship!
scala> 1L === 1
<console>:11: error: TypeWiseBalancedEquality requires Long and Int to be in a subtype relationship!

The types in this example – Int and Long – are not in a subtype relationship and therefore our type-wise === doesn’t work. Well, while this is how we designed it, this behavior is kind of odd, because there is a certain relationship between Int and Long: a view-based one. That is, there is an implicit conversion from Int to Long which is the reason why we can assign Int values to Long variables:

scala> val l: Long = 1
l: Long = 1

While it is not a “must”, it’s reasonable to argue that type-safe equality should also work for types which are in a view-based relationship. So let’s add a view-based === to our solution!

Most of TypeWiseBalancedEquality can be copied over to ViewWiseBalancedEquality. The only difference is the type class instances. These have to be provided the following way:

implicit def rightToLeftEquality[L, R](implicit view: R => L): Equality[L, R] =
  new RightToLeftViewEquality(view)
implicit def leftToRightEquality[L, R](implicit view: L => R): Equality[L, R] =
  new LeftToRightViewEquality(view)
private class LeftToRightViewEquality[L, R](view: L => R) extends Equality[L, R] {
  override def areEqual(left: L, right: R): Boolean =
    view(left) == right
}
private class RightToLeftViewEquality[L, R](view: R => L) extends Equality[L, R] {
  override def areEqual(left: L, right: R): Boolean =
    left == view(right)
}

As you can see, we need to reqire implicit conversions from the left type to the right or vice versa to be in scope for the type class instances. With these changes we make the above example work:

scala> import name.heikoseeberger.demoequality.ViewWiseBalancedEquality._
import name.heikoseeberger.demoequality.ViewWiseBalancedEquality._

scala> 1 === 1L
res0: Boolean = true

scala> 1L === 1
res1: Boolean = true

OK, we’re done, that’s our solution for view-wise balanced type-safe equality. Here is the complete code:

object ViewWiseBalancedEquality {
  implicit class Equal[L](val left: L) extends AnyVal {
    def ===[R](right: R)(implicit equality: Equality[L, R]): Boolean =
      equality.areEqual(left, right)
  }
  @implicitNotFound("ViewWiseBalancedEquality requires ${L} and ${R} to be in an implicit conversion relationship, i.e. one can be viewed as the other!")
  sealed trait Equality[L, R] {
    def areEqual(left: L, right: R): Boolean
  }
  object Equality extends LowPriorityEqualityImplicits {
    implicit def rightToLeftEquality[L, R](implicit view: R => L): Equality[L, R] =
      new RightToLeftViewEquality(view)
  }
  trait LowPriorityEqualityImplicits {
    implicit def leftToRightEquality[L, R](implicit view: L => R): Equality[L, R] =
      new LeftToRightViewEquality(view)
  }
  private class LeftToRightViewEquality[L, R](view: L => R) extends Equality[L, R] {
    override def areEqual(left: L, right: R): Boolean =
      view(left) == right
  }
  private class RightToLeftViewEquality[L, R](view: R => L) extends Equality[L, R] {
    override def areEqual(left: L, right: R): Boolean =
      left == view(right)
  }
}

Conclusion

In this post we have improved the type-wise balanced type-safe equality and also added a view-based one. I think that type-safe equality is very important and therefore wonder whether the sample code which is available on GitHub should be “lifted” to something like a “util-equality” project. What do you think?

Implicits unchained – type-safe equality – part 3