Hi all, I am about to publish a blog about this, it is the approach I ended up following in my project to sort out different kinds of issues I was facing during the implementation of it. So I would be happy to get some feedback for the blog, your comments are welcomed.
I tried various approaches before concluding to this one. Initially I've impl my code with typed actors but those proved hard to work with and also they are going to be deprecated. They had some benefits, OO and type safety. Then I refactored to actors and messaging. That proved cumbersome to maintain and hard to test (sorry ! still akka is the best distributed computing lib I've worked with ). Then I decided due to those issues to try different approaches, continuations was one but due to tech limitations I ended up with this active object implementation of mine. So here is the blog. Is it easy to understand? Does it makes sense? Is the active object approach good/bad/someone did it already and succeeded/failed? -------------------------------------- For a lot of IT projects there is a need to ingest and query large volumes of data. Traditional databases fail to distribute these data into multiple servers and to respond quickly enough to queries. Big data technologies do effectively full table scans on the data even if a small subset of the data are required and also the developers need to spend a lot of effort thinking of how to structure the data to optimize sets of queries. At AKT IT I am developing a next generation big data nosql + sql database that will distribute the data and queries to multiple servers and is meant for tables containing terrabytes of data. This will be a sql distributed database, which means developers can normalize the data and index them as they do on relational databases. The software is implemented in Scala using akka. Initially an implementation was done using typed actors but because those were deprecated and due to various problems with that model of distributed computing, the implementation was switched to akka actors. Coding such a complex piece of software with actors and message passing proved cumbersome. There are several sections of the code that were hard to work with and especially hard to unit test. Many times communication between 3 types of actors was required and in a sequential fashion. Other times scatter/gather patterns have to be applied and even schedules to be run in order to modify the state of an actor. Flows of data had to move across different actors. Although we implemented utilities that could flow the data over all required actors (similar to akka streams), unit testing actors using those flows were quite hard due to the volume of participants and that there is no type safety. Refactoring was hard as it is hard to control who sends and who receives certain messages. The new active-objects approach During development we realized that a new approach to coding distributed systems is required to ease the development of this software. Standard object oriented approaches proved more appropriate and testing via mocking traits and classes easier. I ended up implementing a simple active-object-like library on top of akka and refactoring actors to active objects. Those familiarized with spark will feel right at home. Those who know typed actors will also easily understand the library as it has a lot of similarities - but some significant differences. Lets go straight into an example. Assuming we have 2 actors, an addition and a multiplication one, we want to impl them using this new library. trait AddService { // add x to y and use the remote multiply service to multiply by z def addAndMultiply(x: Int, y: Int, z: Int): R[Int] } @remoteService class AddServiceRemote extends AddService trait MultiplyService { def multiply(x: Int, y: Int): R[Int] } @remoteService class MultiplyServiceRemote extends MultiplyService The above definitions are straight forward apart from the @remoteService part. This is a marker for a macro that rewrites AddServiceRemote to be a remote proxy for AddService. We will go into the details later on. Now the implementations of the traits: class MultiplyServiceImpl extends MultiplyService { // returns an R { x * y } override def multiply(x: Int, y: Int) = begin { x * y } } class AddServiceImpl(multiplyService: *MultiplyServiceRemote*) extends AddService { def addAndMultiply(x: Int, y: Int, z: Int) = { val sum = x + y // since the multiply service is remote, we need a timeout for every call implicit val timeout = 1 minute multiplyService.multiply(sum, z) // executes remotely, returns an R { sum * z } } } Now the above are straight forward. There are some utilities like the begin { } which returns an R[T] but other than that it feels as if we write object oriented code that will execute sequentially. All code runs with actor threading semantics which means we could modify local variables without synchronization. We can chain calls and the code will execute in a pseudo-sequential way, i.e. def addAndMultiplyAndAddOne(x: Int, y: Int, z: Int) = addAndMultiply(x,y,z).map(_ + 1) The above method will return (x + y)*z + 1 . With the benefit that it all seems to run sequentially despite the call to the remote multiplication service. More akka goodies are included. I.e. we can schedule things and resume processing as if it is all executed sequentially: def scheduledMultiplication(x: Int, y: Int, z: Int) = scheduleOnce(500 millis) { // this will run after 500 ms of the call, it runs using actor semantics, // which means we could modify local variables without synchronization x+y }.andThen { sum => // sum=x+y, we will now call the remote multiply service // since the multiply service is remote, we need a timeout for every call implicit val timeout = 1 minute multiplyService.multiply(sum,z) } And we can also use Future's, all seemingly executing sequentially. This way if we need to do say a slow I/O operation, we won't block the actor. def futureMultiplication(x: Int, y: Int, z: Int) = future { // this will run in a separate thread, not blocking the actor, // but access to state needs to be sync'd x+y }.andThen { sum => // sum=x+y, we will now call the remote multiply service. // Note that this runs with actor semantics again, so we could modify state // without sync implicit val timeout = 1 minute multiplyService.multiply(sum,z) } Testing the above active objects is easy and can be done with the usual mocking and testing libs. We can mock the multiplication remote service to isolate the AddService and test it. And we can run the code outside an actor for unit testing purposes. In fact, tests of actors refactored to active objects are far simpler than tests of the actors themselves. And unit testing of some very complicated actor logic became possible. when(multiplyService.multiply(3, 6)).thenReturn(100) addService.addAndMultiply(1,2,6).value should be (100) A simple but detailed example So lets say we want to create an actor that adds a number to an internal accumulator: trait AddService { def add(x: Int): R[Int] } This looks pretty straight forward, apart from the return type. We would expect an Int but we have an *R[Int]*. With typed actors, if we returned an Int, the call would be blocking, which would negatively impact performance. If it returns Future[Int] then I loose the ability to modify state without synchronization. But with this active-object implementation, a call to add will return an R[Int] which is a description of what the method will do, similar to spark's RDD's. R has a couple of useful methods like map() which we can safely modify state as it all runs using actor threading semantics. We will better see that with an example: class AddServiceImpl extends AddService { private var value = 0 def add(x: Int) = begin { value += x value } } @remoteService class AddServiceRemote extends AddService "begin" is a method of my library that takes a function. It just remembers the function so that we can execute it later on within the actor. The execution via an actor ensures that multi-threaded calls to our service don't need sync as the code block runs with actor semantics. But what does the @remoteService part of the code do? Well, we can't just call new AddServiceImpl().add(5) and expect to get the value. The call will return R[Int] which can only be executed via an actor. And remember that we deal with remote calls which might execute on a different server from the one that the caller executes. So we need a timeout to make sure we don't wait forever for a response and we need to be able to process the response when it arrives, without blocking. This is what @remoteService does. A macro re-writes AddServiceRemote which now has a method def add(x:Int)(implicit val timeout:FiniteDuration):R[Int] with RemoteMethods[Int] So when we actually want to use AddService, we need to do val r=new AddServiceRemote().add(5)(1 second) // we can now fire and forget the call, ignoring the result r.fireAndForget() // or do something with the result r.map ( _ * 5 ) -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.