Hi all, 

I am about to publish a blog about this, it is the approach I ended up 
following in my project to sort out different kinds of issues I was facing 
during the implementation of it. So I would be happy to get some feedback 
for the blog, your comments are welcomed.

I tried various approaches before concluding to this one. Initially I've 
impl my code with typed actors but those proved hard to work with and also 
they are going to be deprecated. They had some benefits, OO and type 
safety. Then I refactored to actors and messaging. That proved cumbersome 
to maintain and hard to test (sorry ! still akka is the best distributed 
computing lib I've worked with ). Then I decided due to those issues to try 
different approaches, continuations was one but due to tech limitations I 
ended up with this active object implementation of mine.

So here is the blog. Is it easy to understand? Does it makes sense? Is the 
active object approach good/bad/someone did it already and 
succeeded/failed? 
--------------------------------------

For a lot of IT projects there is a need to ingest and query large volumes 
of data. Traditional databases fail to distribute these data into multiple 
servers and to respond quickly enough to queries. Big data technologies do 
effectively full table scans on the data even if a small subset of the data 
are required and also the developers need to spend a lot of effort thinking 
of how to structure the data to optimize sets of queries.

At AKT IT I am developing a next generation big data nosql + sql database 
that will distribute the data and queries to multiple servers and is meant 
for tables containing terrabytes of data. This will be a sql distributed 
database, which means developers can normalize the data and index them as 
they do on relational databases.

The software is implemented in Scala using akka. Initially an 
implementation was done using typed actors but because those were 
deprecated and due to various problems with that model of distributed 
computing, the implementation was switched to akka actors.

Coding such a complex piece of software with actors and message passing 
proved cumbersome. There are several sections of the code that were hard to 
work with and especially hard to unit test. Many times communication 
between 3 types of actors was required and in a sequential fashion. Other 
times scatter/gather patterns have to be applied and even schedules to be 
run in order to modify the state of an actor. Flows of data had to move 
across different actors. Although we implemented utilities that could flow 
the data over all required actors (similar to akka streams), unit testing 
actors using those flows were quite hard due to the volume of participants 
and that there is no type safety. Refactoring was hard as it is hard to 
control who sends and who receives certain messages.
The new active-objects approach

During development we realized that a new approach to coding distributed 
systems is required to ease the development of this software. Standard 
object oriented approaches proved more appropriate and testing via mocking 
traits and classes easier. I ended up implementing a simple 
active-object-like library on top of akka and refactoring actors to active 
objects. 

Those familiarized with spark will feel right at home. Those who know typed 
actors will also easily understand the library as it has a lot of 
similarities - but some significant differences.

Lets go straight into an example. Assuming we have 2 actors, an addition 
and a multiplication one, we want to impl them using this new library.

trait AddService
{
   // add x to y and use the remote multiply service to multiply by z
   def addAndMultiply(x: Int, y: Int, z: Int): R[Int]
}

@remoteService
class AddServiceRemote extends AddService

trait MultiplyService
{
   def multiply(x: Int, y: Int): R[Int]
}

@remoteService
class MultiplyServiceRemote extends MultiplyService


The above definitions are straight forward apart from the @remoteService 
part. This is a marker for a macro that rewrites AddServiceRemote to be a 
remote proxy for AddService. We will go into the details later on.

Now the implementations of the traits:

class MultiplyServiceImpl extends MultiplyService
{
   // returns an R { x * y }
   override def multiply(x: Int, y: Int) = begin {
      x * y
   }

}

class AddServiceImpl(multiplyService: *MultiplyServiceRemote*) extends 
AddService
{

    def addAndMultiply(x: Int, y: Int, z: Int) = {
       val sum = x + y
       // since the multiply service is remote, we need a timeout for every call
       implicit val timeout = 1 minute
       multiplyService.multiply(sum, z) // executes remotely, returns an R { 
sum * z }
    }
}

Now the above are straight forward. There are some utilities like the begin 
{ }  which returns an R[T] but other than that it feels as if we write 
object oriented code that will execute sequentially. All code runs with 
actor threading semantics which means we could modify local variables 
without synchronization. We can chain calls and the code will execute in a 
pseudo-sequential way, i.e.

def addAndMultiplyAndAddOne(x: Int, y: Int, z: Int) = 
addAndMultiply(x,y,z).map(_ + 1)

The above method will return (x + y)*z + 1 . With the benefit that it all 
seems to run sequentially despite the call to the remote multiplication 
service.

More akka goodies are included. I.e. we can schedule things and resume 
processing as if it is all executed sequentially:

def scheduledMultiplication(x: Int, y: Int, z: Int) = scheduleOnce(500 millis) {
   // this will run after 500 ms of the call, it runs using actor semantics,
   // which means we could modify local variables without synchronization
   x+y
}.andThen { sum =>
   // sum=x+y, we will now call the remote multiply service

   // since the multiply service is remote, we need a timeout for every call
   implicit val timeout = 1 minute

   multiplyService.multiply(sum,z)
}

 And we can also use Future's, all seemingly executing sequentially. This 
way if we need to do say a slow I/O operation, we won't block the actor.

def futureMultiplication(x: Int, y: Int, z: Int) = future {
   // this will run in a separate thread, not blocking the actor,
   // but access to state needs to be sync'd
   x+y
}.andThen { sum =>
   // sum=x+y, we will now call the remote multiply service.
   // Note that this runs with actor semantics again, so we could modify state
   // without sync

   implicit val timeout = 1 minute
   multiplyService.multiply(sum,z)
}

Testing the above active objects is easy and can be done with the usual 
mocking and testing libs. We can mock the multiplication remote service to 
isolate the AddService and test it. And we can run the code outside an 
actor for unit testing purposes. In fact, tests of actors refactored to 
active objects are far simpler than tests of the actors themselves. And 
unit testing of some very complicated actor logic became possible.

when(multiplyService.multiply(3, 6)).thenReturn(100)
addService.addAndMultiply(1,2,6).value should be (100)

A simple but detailed example

So lets say we want to create an actor that adds a number to an internal 
accumulator:

trait AddService
{
   def add(x: Int): R[Int]
}

This looks pretty straight forward, apart from the return type. We would 
expect an Int but we have an *R[Int]*. 

With typed actors, if we returned an Int, the call would be blocking, which 
would negatively impact performance. If it returns Future[Int] then I loose 
the ability to modify state without synchronization. 

But with this active-object implementation, a call to add will return an 
R[Int] which is a description of what the method will do, similar to 
spark's RDD's. R has a couple of useful methods like map() which we can 
safely modify state as it all runs using actor threading semantics.

We will better see that with an example:

class AddServiceImpl extends AddService
{
   private var value = 0
   def add(x: Int) = begin {
      value += x
      value
   }
}

@remoteService
class AddServiceRemote extends AddService

"begin" is a method of my library that takes a function. It just remembers 
the function so that we can execute it later on within the actor. The 
execution via an actor ensures that multi-threaded calls to our service 
don't need sync as the code block runs with actor semantics.

But what does the @remoteService part of the code do? Well, we can't just 
call new AddServiceImpl().add(5) and expect to get the value. The call will 
return R[Int] which can only be executed via an actor. And remember that we 
deal with remote calls which might execute on a different server from the 
one that the caller executes. So we need a timeout to make sure we don't 
wait forever for a response and we need to be able to process the response 
when it arrives, without blocking. This is what @remoteService does. A 
macro re-writes AddServiceRemote which now has a method 

def add(x:Int)(implicit val timeout:FiniteDuration):R[Int] with 
RemoteMethods[Int]


So when we actually want to use AddService, we need to do

val r=new AddServiceRemote().add(5)(1 second)
// we can now fire and forget the call, ignoring the result
r.fireAndForget()
// or do something with the result
r.map ( _ * 5 )



-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to