Hi Boris,

On Sat, Mar 22, 2014 at 6:45 AM, Boris Capitanu <bor...@gmail.com> wrote:

> Hello,
>
> Apologies in advance for the long post... to frame my problem and
> questions I have to give a lot of contextual info.
>
> I am running a set of experiments to understand the feasibility and
> performance characteristics of running Akka on the Blue 
> Waters<https://bluewaters.ncsa.illinois.edu/>(batch) supercomputer at
> NCSA <http://www.ncsa.illinois.edu/>.
>
> The general theme for the experiments is to create a map-reduce like
> system comprised of a producer, one (or more) coordinator(s), and a number
> of workers. (the reduce step could be optional)
> The producer is responsible for generating the "work" which is then sent
> to the coordinator(s) which in turn assign the work to an idle worker.
> At first I am most concerned with the performance of the solution (rather
> than fault-tolerance). The only fault-tolerance aspect I'd like to address
> at first consists of exceptions thrown as part of executing the "work".
>  When that happens, the "executor" should be restarted and the coordinator
> informed of the failed job which would get logged and perhaps retried later
> (up to n times) depending on the exception. Eventually the final solution
> would include a checkpointing mechanism that would allow the PBS job to be
> resumed from the last checkpoint in case the it is killed or a malfunction
> occurs with BlueWaters.
>
> Besides learning about the different options/architectures that are
> appropriate for an environment such as Blue Waters, the problem I
> ultimately want to solve is to process approx. 3.2 million gzipped text
> files (about 3.2TB of gzipped data). The "processing" could be any number
> of things - feature extraction, OCR error correction, or other
> text-processing routines as needed.
>
> About my experience: I am no Scala and Akka expert but I do have a decent
> amount of experience with both. I've taken Martin's functional programming
> in scala course on Coursera, as well as the reactive programming course by
> Martin, Erik, and Roland. I've also read through a number of scala and akka
> books, tutorials, examples...etc. but, as with just about everything else
> in life, there's always more to learn.
>
> My first attempt was to adapt the Distributed Workers with Akka and 
> Scala<http://www.typesafe.com/activator/template/akka-distributed-workers> 
> example
> just to see how that performs "out of the box". In the process I've also
> had to find a solution to be able to get the application deployed to the
> Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did
> come up with 2 ways to do that that seem to work quite well (if anyone
> wants to know how, I could write about that). I did manage to get the
> example to work - tested it on a small subset of the data (with some
> changes to WorkProducer) - but I felt that this solution was too overly
> concerned with fault tolerance at the expense of (some) performance, so I
> wanted to create another solution where those concerns would be reversed.
>  (it's still on my TODO list to get some performance measurements of the
> original example so that I can compare numbers across the different
> solutions)
>
> The way I'm planning on evaluating the performance of each solution is by
> having the work producer generate a known quantity of (empty) work as fast
> as possible, have that work be coordinated by the master over a known
> number of worker nodes, and have the executor on each worker do nothing
> else but respond immediately with an (empty) result.
>

Be aware of that the network roundtrips might dominate when you test with
empty work in a way that might not be relevant when the workers do real
work.


> From this setup I want to record the timing of the following operations:
>
>    - overall wall clock time from start of work being produced to when
>    the last result was received by the worker actor from the executor (the
>    master and workers would already be up, registered, and ready before the
>    producer starts sending out work) (this time is not affected by other jobs
>    running on the bluewaters system since the work involves no I/O to disk -
>    just network - and the compute nodes where the worker actors are deployed
>    are exclusively dedicated to this task - are not running other user jobs)
>    - overall stats about the time elapsed between when a Work object is
>    sent to the master by the producer, and the moment the Master receives it
>    - overall stats about the time elapsed between Master sending work to
>    Worker, and Worker responding with WorkIsDone
>
> In (pseudo-scala) code, the solution I have created behaves like this (the
> relevant parts):
>
> *Producer*
>
> val numWorkToGenerate = N
>
> for (i <- 1 to numWorkToGenerate) {
>
>    timestamp = getCurrentTimestamp
>
>    work = new Work(i, timestamp)
>
>    master ! work
>
> }
>
> master ! NoMoreWork
>
>
> *Master*
>
> ... keep track of workers and what they're working on, and keep a work
> queue of work that still needs to be assigned ...
>
>
>
> def receive = {
>
>    case work: Work =>
>
>       record time delta based on current timestamp and timestamp packaged
> in 'work'
>
>       find idle worker and, if found, update timstamp in 'work' with
> current timestamp and send 'work' to worker
>
>
>    case NoMoreWork => if the work queue is empty and none of the workers
> are working on anything then output all recorded times to log and quit
> gracefully
>
>
>    case WorkIsDone =>
>
>       record time delta based on current timestamp and timestamp packaged
> in 'work'
>
>       if work queue is not empty and there is an idle worker then do the
> right thing as in the 'case work' above, otherwise check if all work is
> done and run the NoMoreWork logic
>
> }
>
>
> *Worker*
>
> ... create an Executor actor that will be used to execute the work and
> keep track of what it's working on ...
>
>
>
> def idle = {
>
>    case work: Work =>
>
>       executor ! work
>
>       context become working
>
> }
>
>
> def working = {
>
>    case ExecuteDone =>
>
>       master ! WorkDone
>
>       context become idle
>
> }
>
>
> *Executor*
>
> def receive = {
>
>    case work: Work => sender ! ExecuteDone
>
> }
>
>
> For those interested the actual code is here:
> https://github.com/borice/akka-grid-workers
>

I noticed one error when use onSuccess callback. See here
http://doc.akka.io/docs/akka/2.3.0/general/jmm.html#Actors_and_shared_mutable_state


>
> When I run this, each physical compute node will run one instance of the
> Worker app per CPU core;
> The Master and Producer will be run on the same compute node.  (other
> Worker instances will also run on that node, on the 'remaining' CPU cores)
>
> Compared to the original example I started from:
>
>    - my first solution tries to have as little communication as possible
>    - there is no acknowledgment from Master to Producer for each 'work'
>    received;  producer operates in a fire-and-forget style
>    - the pull-model used in the original example is still there, but has
>    been simplified to be triggered by (not shown above) RegisterWorker,
>    WorkDone, and WorkFailed messages rather than using the WorkerRequestsWork
>    / WorkIsReady mechanism
>    - the result of the work is not given to the Master; the master is
>    only informed that the work was done (this is because the simplest version
>    of work may not need to produce a result to someone --- the result could
>    just be an output written to a file or DB or whatever;  i will explore the
>    use of a pub-sub system to publish the result in a future version of the
>    solution)
>    - FYI: i did some renaming in the code: Master has been baptized as
>    "Coordinator" and WorkProducer became "Producer"
>
> I'm not concerned with how realistic the above assumptions are right now
> -- I just want to see what the performance is of the leanest solution i can
> think of, learn about its shortcomings, and use that to keep iterating to
> improve on it.  I'm also curious about the reliability of the communication
> between the compute nodes, and how the akka cluster behaves with no
> code-added fault tolerance as I scale up the number of nodes. I suspect
> that on Blue Waters the reliability guarantees are higher than if I were
> creating an akka cluster over a bunch of commodity computers.
>
> So... finally now I can describe what the problem I'm having is (thanks
> for reading this far!)
>
> Basically I ran a test of the code on GitHub on BlueWaters on 3 physical
> nodes, each node having 32 cores (and 64GB RAM).
> I'm using the "<role>.min-nr-of-members" config setting to ensure that all
> nodes that join the akka cluster are transitioned to "Up" only after all
> workers, coordinator, and producer join the cluster.
> I've used the "registerOnMemberUp" callback as the trigger to create the
> Producer, Coordinator (aka Master), and Worker actors.  When Workers and
> Producer actors start, they look up the coordinator ActorRef and the
> Producer uses that as the trigger to start producing the work.
> I've used the "producer.workCount" config setting to try different
> quantities of work and for small values (<100,000), everything works fine.
> When I tried using workCount=1,000,000 the Producer would start getting
> disassociated from the cluster -- it would basically detect everyone else
> as UNREACHABLE, or I would see many entries like the following in the logs:
>
> [ReliableDeliverySupervisor:71] [WARN] - Association with remote system
> [akka.tcp://cluster@nid25948:38358] has failed, address is now gated for
> [5000] ms. Reason is: [Disassociated].
>
>
> It would seem that either the Producer becomes congested trying to send
> out the messages to the coordinator, or maybe the coordinator somehow
> becomes congested in a way that affects the Producer.
>

yes, you might need to add some flow control, or throttling.


>  It's definitely the case that this solution isn't too viable since a
> single coordinator will have trouble keeping up with so many messages,
> especially given that it also needs to be able to exchange messages with
> the workers (and "in production" there might be thousands of workers, or
> more).
> In theory this should've still worked... as far as I understand it, the
> actor mailbox size is only limited by the amount of memory available (and
> I've been generous with that - and saw no OOM messages anywhere or process
> crashes).
>

long garbage collection pauses can also trigger failure detection


>
> I wonder whether the akka cluster extension in this case could be playing
> a part in the issues I'm seeing.  The heartbeats or gossip messages
> exchanged between the cluster nodes might add unneeded traffic to a
> potentially-already-saturated network. I think that the simple solution I
> presented above could be implemented without using the cluster extension,
> by just using remote actors. The Producer only needs to know about the
> Coordinator (and monitor its lifecycle), and similar for the Workers. I
> wonder what the practical limits are for the maximum size of an akka
> cluster, assuming there are no JVM-related limits, when the only limit
> becomes the communication protocol/medium...
>

http://typesafe.com/blog/running-a-2400-akka-nodes-cluster-on-google-compute-engine


>
> I'll be collecting some stats for values of "workCount" that do not cause
> errors for future comparison...
>
> It seems to me that the next thing I should try is to lessen the stress on
> the coordinator. I'm thinking a better solution would be to plan to have
> one Coordinator instance on each physical compute node, and have the
> workers that are started on that physical node register with it... then on
> the physical node where the Producer is started, also start a router where
> the routees are created on each of the physical nodes as Coordinators.
>  Then the Producer would send the work to the router which could use
> RoundRobin (or other policies) to route the work to coordinators.
>
> Finally, as part of the mix of solutions I'll be experimenting with I'd
> like to also simulate the same basic setup in an MPI-based application, and
> do some timing comparison just for the heck of it.  Its very likely that
> the MPI solution will perform better (especially when using the highly
> optimized Cray MPI library over the Gemini router ASIC) but it would be
> educational to see the magnitude of the difference.  I would be interested
> to explore the possibility of adding MPI-based communication to Akka as an
> alternative to TCP, if the performance difference is significant.
>
> I'll be posting updates on this thread as I have more results.
>

Thanks for an interesting post.

Regards,
Patrik



> I'm looking for comments and suggestions based on what I wrote above...
>
> Thank you in advance!  Again, sorry for the long post.
>
> -Boris
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to