Nonblocking IO on the JVM has always been a bit of a pain, often resulting in a lot of distracting boilerplate. Apache MINA, JBoss Netty and others create abstractions to make working with NIO more straightforward and they all have their own use cases. A rarely discussed topic is how to integrate NIO with the actor model. Preferably we would be able to integrate NIO into our actors making received packets on the sockets be just as any other message to the actor. Luckily this is exactly what we get with Akkas HawtDispatcher.
Released in 2010 HawtDispatch is a lightweight and implementation of Apples Grand Central Dispatch idea of handling concurrency. HawtDispatch creates a threadpool with only as many threads as you have cores on your machine leading to minimal scheduling overhead and excellent performance for truly nonblocking tasks. HawtDispatch also implements Java NIO on top of its dispatch queues allowing you to process NIO events as any other task.
The great Java/Scala actors library Akka implements HawtDispatch as one of its many dispatchers. Except being the top performing dispatcher for a long time (recent optimization to Akkas default dispatcher has made it up to par in speed) it also provides simple NIO to our actors. This feature is mostly undocumented except for an example implementation of an echo server. Growing up writing single threaded applications in C NIO has always been my first choice when working with sockets. By writing some helper classes to handle the plumbing of the IO inside my actor I can now add sockets wherever I want in my code by only adding a couple of lines of code and without having to spawn extra threads just to wait for socket action.
This is how a fully functional TCP Client looks in code:
import akka.actor.Actor
import akka.dispatch.HawtDispatcher
import st.salanki.libsalank.io.HawtTcpClient
import java.nio.ByteBuffer
/** Our actor using the HawtTcpClient, should also be a good simple example of usage */
class TCPClient extends Actor {
/* This should be global, you only want one HawtDispatcher for your application */
self.dispatcher = new HawtDispatcher
/* Initialize the sockets, no connection is made at this point though */
val io = new HawtTcpClient(self, new java.net.InetSocketAddress("localhost", 6000), msgHandler, crashHandler, onConnect)
override def preStart = {
io.start() /* Start the socket. This will initiate an async TCP Connect */
}
/* The message handler will be executed just as a handler would for a regular actor message. */
private def msgHandler(packet: ByteBuffer) = {
println("Received: " + new String(packet.array, 0, packet.position))
}
/* As the TCP connect is asynchronous this is how we know when the connection
* is complete and we can start sending data/expect data */
private def onConnect() = println("Connected!")
/* If something goes wrong or the connection is closed remotely the crashHandler is invoked.
* This one acts as a message in the actor as well, nothing weird here. */
private def crashHandler(e: Exception) {
e match {
case e: java.net.ConnectException => println("Connect failed") /* TCP Connect failed */
case e => println("Some other error: " + e)
}
}
def receive = {
case msg: String => {
println("Sending: " + msg)
/* As this is NIO we can't send packets straight away. Data is added to a queue that is sent out as soon as the socket can.
* Data can be enqueued before the socket is connected */
io.enqueuePacket(msg.getBytes)
}
}
override def postStop = {
io.stop() /* Close the socket and cancel all selectors */
}
}
This code utilizes my HawtTcpClient helper. It only depends on Akka and can be grabbed either together with libsalank or separately.
To test start a netcat in a separate window with: nc -l 6000
Saving the code above to tcpclient.scala and testing this out with libsalank in the REPL would look like:
libsalank $ sbt "project libsalank-akka" console ..... [info] Starting scala interpreter... [info] Welcome to Scala version 2.8.1.final (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_24). Type in expressions to have them evaluated. Type :help for more information. scala> :load ./tcpclient.scala Loading ./tcpclient.scala... import akka.actor.Actor import akka.dispatch.HawtDispatcher import st.salanki.libsalank.io.HawtTcpClient import java.nio.ByteBuffer defined class TCPClient scala> val client = Actor.actorOf[TCPClient] client: akka.actor.ActorRef = Actor[TCPClient:d6567fa0-60f2-11e0-ae22-005056c00008] scala> client.start res0: akka.actor.ActorRef = Actor[TCPClient:d6567fa0-60f2-11e0-ae22-005056c00008] scala> Connected! Received: Test From Netcat scala> client ! "Message to netcat" scala> Sending: Message to netcat
Netcat display:
$ nc -l 6000 Test From Netcat Message to netcat
I also have a UDP Client/Server helper that is even easier to use as you don’t have to deal with connections in UDP. Check out the tests for some more examples.

Recent Comments