
Apologies in advance for the long post... to frame my problem and questions 
I have to give a lot of contextual info.

I am running a set of experiments to understand the feasibility and 
performance characteristics of running Akka on the Blue 
Waters<https://bluewaters.ncsa.illinois.edu/>(batch) supercomputer at 
NCSA <http://www.ncsa.illinois.edu/>.

The general theme for the experiments is to create a map-reduce like system 
comprised of a producer, one (or more) coordinator(s), and a number of 
workers. (the reduce step could be optional)
The producer is responsible for generating the "work" which is then sent to 
the coordinator(s) which in turn assign the work to an idle worker. 
At first I am most concerned with the performance of the solution (rather 
than fault-tolerance). The only fault-tolerance aspect I'd like to address 
at first consists of exceptions thrown as part of executing the "work". 
 When that happens, the "executor" should be restarted and the coordinator 
informed of the failed job which would get logged and perhaps retried later 
(up to n times) depending on the exception. Eventually the final solution 
would include a checkpointing mechanism that would allow the PBS job to be 
resumed from the last checkpoint in case the it is killed or a malfunction 
occurs with BlueWaters.

Besides learning about the different options/architectures that are 
appropriate for an environment such as Blue Waters, the problem I 
ultimately want to solve is to process approx. 3.2 million gzipped text 
files (about 3.2TB of gzipped data). The "processing" could be any number 
of things - feature extraction, OCR error correction, or other 
text-processing routines as needed.

About my experience: I am no Scala and Akka expert but I do have a decent 
amount of experience with both. I've taken Martin's functional programming 
in scala course on Coursera, as well as the reactive programming course by 
Martin, Erik, and Roland. I've also read through a number of scala and akka 
books, tutorials, examples...etc. but, as with just about everything else 
in life, there's always more to learn.

My first attempt was to adapt the Distributed Workers with Akka and 
just to see how that performs "out of the box". In the process I've also 
had to find a solution to be able to get the application deployed to the 
Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did 
come up with 2 ways to do that that seem to work quite well (if anyone 
wants to know how, I could write about that). I did manage to get the 
example to work - tested it on a small subset of the data (with some 
changes to WorkProducer) - but I felt that this solution was too overly 
concerned with fault tolerance at the expense of (some) performance, so I 
wanted to create another solution where those concerns would be reversed. 
 (it's still on my TODO list to get some performance measurements of the 
original example so that I can compare numbers across the different 

The way I'm planning on evaluating the performance of each solution is by 
having the work producer generate a known quantity of (empty) work as fast 
as possible, have that work be coordinated by the master over a known 
number of worker nodes, and have the executor on each worker do nothing 
else but respond immediately with an (empty) result. From this setup I want 
to record the timing of the following operations:

   - overall wall clock time from start of work being produced to when the 
   last result was received by the worker actor from the executor (the master 
   and workers would already be up, registered, and ready before the producer 
   starts sending out work) (this time is not affected by other jobs running 
   on the bluewaters system since the work involves no I/O to disk - just 
   network - and the compute nodes where the worker actors are deployed are 
   exclusively dedicated to this task - are not running other user jobs)
   - overall stats about the time elapsed between when a Work object is 
   sent to the master by the producer, and the moment the Master receives it
   - overall stats about the time elapsed between Master sending work to 
   Worker, and Worker responding with WorkIsDone

In (pseudo-scala) code, the solution I have created behaves like this (the 
relevant parts):


val numWorkToGenerate = N

for (i <- 1 to numWorkToGenerate) {

   timestamp = getCurrentTimestamp 

   work = new Work(i, timestamp)

   master ! work


master ! NoMoreWork


... keep track of workers and what they're working on, and keep a work 
queue of work that still needs to be assigned ...


def receive = {

   case work: Work =>

      record time delta based on current timestamp and timestamp packaged 
in 'work'

      find idle worker and, if found, update timstamp in 'work' with 
current timestamp and send 'work' to worker

   case NoMoreWork => if the work queue is empty and none of the workers 
are working on anything then output all recorded times to log and quit 

   case WorkIsDone =>

      record time delta based on current timestamp and timestamp packaged 
in 'work'

      if work queue is not empty and there is an idle worker then do the 
right thing as in the 'case work' above, otherwise check if all work is 
done and run the NoMoreWork logic



... create an Executor actor that will be used to execute the work and keep 
track of what it's working on ...


def idle = {

   case work: Work =>    

      executor ! work

      context become working


def working = {

   case ExecuteDone =>

      master ! WorkDone 

      context become idle



def receive = {

   case work: Work => sender ! ExecuteDone


For those interested the actual code is here: 

When I run this, each physical compute node will run one instance of the 
Worker app per CPU core;
The Master and Producer will be run on the same compute node.  (other 
Worker instances will also run on that node, on the 'remaining' CPU cores)

Compared to the original example I started from:

   - my first solution tries to have as little communication as possible
   - there is no acknowledgment from Master to Producer for each 'work' 
   received;  producer operates in a fire-and-forget style
   - the pull-model used in the original example is still there, but has 
   been simplified to be triggered by (not shown above) RegisterWorker, 
   WorkDone, and WorkFailed messages rather than using the WorkerRequestsWork 
   / WorkIsReady mechanism
   - the result of the work is not given to the Master; the master is only 
   informed that the work was done (this is because the simplest version of 
   work may not need to produce a result to someone --- the result could just 
   be an output written to a file or DB or whatever;  i will explore the use 
   of a pub-sub system to publish the result in a future version of the 
   - FYI: i did some renaming in the code: Master has been baptized as 
   "Coordinator" and WorkProducer became "Producer"

I'm not concerned with how realistic the above assumptions are right now -- 
I just want to see what the performance is of the leanest solution i can 
think of, learn about its shortcomings, and use that to keep iterating to 
improve on it.  I'm also curious about the reliability of the communication 
between the compute nodes, and how the akka cluster behaves with no 
code-added fault tolerance as I scale up the number of nodes. I suspect 
that on Blue Waters the reliability guarantees are higher than if I were 
creating an akka cluster over a bunch of commodity computers.

So... finally now I can describe what the problem I'm having is (thanks for 
reading this far!)

Basically I ran a test of the code on GitHub on BlueWaters on 3 physical 
nodes, each node having 32 cores (and 64GB RAM).
I'm using the "<role>.min-nr-of-members" config setting to ensure that all 
nodes that join the akka cluster are transitioned to "Up" only after all 
workers, coordinator, and producer join the cluster.
I've used the "registerOnMemberUp" callback as the trigger to create the 
Producer, Coordinator (aka Master), and Worker actors.  When Workers and 
Producer actors start, they look up the coordinator ActorRef and the 
Producer uses that as the trigger to start producing the work.
I've used the "producer.workCount" config setting to try different 
quantities of work and for small values (<100,000), everything works fine. 
When I tried using workCount=1,000,000 the Producer would start getting 
disassociated from the cluster -- it would basically detect everyone else 
as UNREACHABLE, or I would see many entries like the following in the logs:

[ReliableDeliverySupervisor:71] [WARN] - Association with remote system 
[akka.tcp://cluster@nid25948:38358] has failed, address is now gated for 
[5000] ms. Reason is: [Disassociated].

It would seem that either the Producer becomes congested trying to send out 
the messages to the coordinator, or maybe the coordinator somehow becomes 
congested in a way that affects the Producer.  It's definitely the case 
that this solution isn't too viable since a single coordinator will have 
trouble keeping up with so many messages, especially given that it also 
needs to be able to exchange messages with the workers (and "in production" 
there might be thousands of workers, or more).
In theory this should've still worked... as far as I understand it, the 
actor mailbox size is only limited by the amount of memory available (and 
I've been generous with that - and saw no OOM messages anywhere or process 
I wonder whether the akka cluster extension in this case could be playing a 
part in the issues I'm seeing.  The heartbeats or gossip messages exchanged 
between the cluster nodes might add unneeded traffic to a 
potentially-already-saturated network. I think that the simple solution I 
presented above could be implemented without using the cluster extension, 
by just using remote actors. The Producer only needs to know about the 
Coordinator (and monitor its lifecycle), and similar for the Workers. I 
wonder what the practical limits are for the maximum size of an akka 
cluster, assuming there are no JVM-related limits, when the only limit 
becomes the communication protocol/medium...

I'll be collecting some stats for values of "workCount" that do not cause 
errors for future comparison...

It seems to me that the next thing I should try is to lessen the stress on 
the coordinator. I'm thinking a better solution would be to plan to have 
one Coordinator instance on each physical compute node, and have the 
workers that are started on that physical node register with it... then on 
the physical node where the Producer is started, also start a router where 
the routees are created on each of the physical nodes as Coordinators. 
 Then the Producer would send the work to the router which could use 
RoundRobin (or other policies) to route the work to coordinators.

Finally, as part of the mix of solutions I'll be experimenting with I'd 
like to also simulate the same basic setup in an MPI-based application, and 
do some timing comparison just for the heck of it.  Its very likely that 
the MPI solution will perform better (especially when using the highly 
optimized Cray MPI library over the Gemini router ASIC) but it would be 
educational to see the magnitude of the difference.  I would be interested 
to explore the possibility of adding MPI-based communication to Akka as an 
alternative to TCP, if the performance difference is significant.

I'll be posting updates on this thread as I have more results.
I'm looking for comments and suggestions based on what I wrote above...

Thank you in advance!  Again, sorry for the long post.












