Just a quick top post. This is informative discussion, please continue :) I agree on that Airavata does not do Work Stealing but it implements "Work Queues”. Conceptually they are similar to the OS Kernel level work queens, but more in a distributed context - https://www.kernel.org/doc/Documentation/workqueue.txt <https://www.kernel.org/doc/Documentation/workqueue.txt>
Suresh > On Oct 6, 2016, at 3:52 PM, Amila Jayasekara <thejaka.am...@gmail.com> wrote: > > > > On Thu, Oct 6, 2016 at 3:17 PM, Shameera Rathnayaka <shameerai...@gmail.com > <mailto:shameerai...@gmail.com>> wrote: > > > On Thu, Oct 6, 2016 at 2:50 PM Amila Jayasekara <thejaka.am...@gmail.com > <mailto:thejaka.am...@gmail.com>> wrote: > On Thu, Oct 6, 2016 at 11:07 AM, Shameera Rathnayaka <shameerai...@gmail.com > <mailto:shameerai...@gmail.com>> wrote: > Hi Amila, > > -- Please explain how you used "work stealing" in distributed system. That > would be interesting. > > Airavata depends on work stealing + amqp for followings, > Fault Tolerance - This is one of major distributed system problem which > critical in Airavata, What ever the reason experiment request processing > shouldn't get any effect from internal node failure. Even with the node > failures, Airavata should be capable enough to continue experiment request > processing or hold it until at least one node appear and then continue. How > this is handled in Ariavata is, worker only ack for messages only after it > completely processed it. If the node goes down without sendings ack for the > messages it was processing,then rabbitmq put all these un-ack messages back > to the queue and available to consume again. > > Resource Utilization- Another important goal of distributed system to > effectively use available resources in the system, namely the memory and > processors of components. In Airavata this will decide the throughput and > response time of experiments. Currently, at a given time workers only get > messages up to a preconfigured limit (the limit is prefetch count) But most > of these jobs are async jobs. That means after worker gets fixed amount of > jobs, it won't get any other jobs even worker capable or handling more jobs, > waste of worker resources. > > > You still did not answer my question. I want to know how you used "work > stealing" in your implementation. In other words how distributed work > stealing works in your implementation. The details you gave above is > unrelated and does not answer my question. > > I think I have explained, how we use work stealing (work queues). If you are > finding a more analog solution to parallel computing work strealing then that > is hard to explain. > > No, you have not. :-). > Work stealing != work queues. In a distributed setting I would image > following kind of a work stealing implementation; Every worker (orchestrator) > maintains a request queue locally and it serve requests coming to the local > queue. Whenever one worker runs out of more requests to serve it will query > other distributed workers local queues to see whether there are requests that > it can serve. If there are it can steal requests from other workers local > queues and process. However, this model of computation is in efficient to do > in a distributed environment. I guess that is the same reason we dont find > much distributed work stealing implementations. > > Anyhow lets stop the discussion about work stealing now. :-) > > > > > > -- I dont see AMQP in the architecture diagram you attached above and I dont > understand why Airavata has to depend on it. One way to figure this out is > think about the architecture without AMQP and figure out what actually should > happend and look for a way to do that using AMQP. > > Worker Queue is AMQP queue. > > Does the worker queue needs to be an AMQP queue ? Sorry, I dont know much > about AMQP but it sounds like limitations you are explaining are because of > AMQP. > > It is not, but good to use well-defined protocol instead of custom one. > Almost all messaging systems have implemented AMQP protocol. > > Can we figure out whether others have also encountered the same/similar > problem and how they tackled those with AMQP ? Cos the design we have is > pretty straightforward and I believe there are systems analogous to our > design that uses AMQP. > > > > -- Does this mean that you have a waiting thread or process within Airavata > after submitting the job (for each work) ? > > No, once the job is submitted to the remote resource, thread goes back to the > thread pool. > > Then, your previous explanation, (i.e., "The time needs for a worker to > finish the work is depend on the application run time (applications runs on > HPC machine). Theoretically, this can be from few sec to days or even > more."), invalidates. Correct ? > > No, it is still valid, thread goes to thread pool doesn't say worker is > complete that request, it is waiting until actual hpc job runs on target > computer resoruces. After this hpc jobs completed then outptu data staging > happens. After output stage to storage then it ack to the work queue message. > > This is confusing to me. > Does this mean once you return thread to thread pool, it is not reusable for > another request ? Also, how do you wait on a thread after returning it to the > thread pool ? > Also, why do you have to wait for HPC job to complete ? I was under the > impression the communication is asynchronous. i.e. after job completes you > get an email confirmation and then you start output data staging in a > separate thread. > > We should probably meet and verbally discuss this. > > -AJ > > > Thanks, > Shameera. > > > > Thanks, > Shameera. > > > It takes more time for me to digest following right now. I will try to give > more feedback when I properly understand them. > > Thanks > -Amila > > > That means, If a worker can read N(=prefetch count) number of messages from a > queue without sending acknowledgment then that is the limit one worker can > handle for given time. But most of this long-running jobs are asynchronous. > Worker resources are free to handle more works than N. Hence Airavata > underutilized worker resources. In the case of small jobs (small in runtime), > this won't be a big problem. > > Apache Zookeeper[2] provide a way to manage distributed system components and > most of the latest distributed systems have been used Zookeeper to address > all the common distributed system problems like HA, FT, Leader Election etc > ... But in Airavata is trying to replace Rabbitmq with Zookeeper to achieve > the same outcomes, I haven't seen any framework have done it. How Airavata > tries to do is using Work Stealing Queues. Anyway, Airavata hasn't move > zookeeper out of its architecture yet as it uses zookeeper to handle cancel > requests. > > > Regarding 3 => Well.. email monitoring was alway problematic. More precisely > monitoring is problematic IMO. To handle monitoring I think we should be able > to get better feedback from job schedulers but as per my experience, even > those job schedulers are unreliable. Until we have a better way to get > feedback from job scheduler, monitoring is going to be challenging. However, > I don't understand why you have "serious scalability" issues in GFac because > of this. > > Let me explain more about this, I have used some terms comes with latest AMQP > spec here. Let's say we have two GFac Workers in the system. and both submit > jobs to Computer Resources and waiting for status update via emails (We > finally decided to depend on emails for a while come up with more robust > monitoring solution) when emails come, there is email monitoring server which > reads this emails and put it to a rabbitmq exchange[3]. Then each gfac > worker has subscribed to this exchange to get all email updates. Because > Framework doesn't know which worker handle particular jobs, it sends this > email content to all workers who subscribe to that exchange. There are few > issues in this way. > > 1. what if one of the workers goes offline for a moment? It should receive > the messages from where he left the queue. To do that we need to use > persistence queue which doesn't remove when consumer disconnect. And this > queue should receive all email updates messages as well during the consumer > down time. To create a persistent queue and reconnect again to the same > queue, the consumer should know about the queue name. Ok, let's say worker > create this queue with a name in the very first time it joins to email > monitoring exchange. Now this problem is solved see the second issue. > > 2. What if one worker node goes down and we start a different node ( runs in > different Machine/VM) . Now there is no way this new worker knows the queue > name created by the previous worker unless we configure it which is not a > very good solution where we have a pool of workers and this pool getting > changes time to time. Now the real problem is all the jobs handled by down > node is getting to this new node but there is no way it gets previous email > monitoring messages. which make these jobs hanging on it previous state > forever. Even the previously down worker comes up this might not get previous > jobs instead it retrieves a new set of jobs. This means we can't scale Gfac > workers independently. Hope this will explain the issue. > > > In summary, to me, none of these are concerns for the architecture at the > moment. Also, you cannot just naively complain an architecture is "not good". > Architecture has to be compared with another design and evaluate and > pros/cons for both. I suggest we first try to improve the existing design to > handle issues you are pointing. > > > It would be great to get some solution for above issues. IMHO, we have > overcomplicated Airavata design with Work Stealing approach, which is not > suitable for Airavata use cases and requirement. > > Thank you for your feedbacks, hope I answered to all of your concerns, > > > [1] http://www.rabbitmq.com/consumer-prefetch.html > <http://www.rabbitmq.com/consumer-prefetch.html> > [2] https://zookeeper.apache.org <https://zookeeper.apache.org/> > [3] https://www.rabbitmq.com/tutorials/tutorial-two-python.html > <https://www.rabbitmq.com/tutorials/tutorial-two-python.html> > > Thanks, > Shameera. > > [2] https://en.wikipedia.org/wiki/Work_stealing > <https://en.wikipedia.org/wiki/Work_stealing> > > Thanks > -Amila > > > On Tue, Oct 4, 2016 at 1:07 PM, Shameera Rathnayaka <shameerai...@gmail.com > <mailto:shameerai...@gmail.com>> wrote: > Hi Devs, > > Airavata has adopted to work stealing design pattern lately and use work > queue approach to distributing works among consumers. There are two work > queues in current Airavata architecture. One in middle of API Server and > Orchestrator and the second one in between Orchestrator and Gfac, Following > is very high-level Airavata architecture. > > > <Highleve Arachitecture- Airavata.png> > Here are the issues we have with above architecture. > > 1. Low resource utilization in Workers (Gfac/Orchestrator). > We have used AMPQ prefetch count to limit the number of requests served by a > Worker, which is not a good way to load balance in the heterogeneous > environment where different workers have different level of resources. And it > is recommended to keep this prefetch count minimum [1] and this is valid for > work stealing too. If we only have one worker and we have M ( > N) number of > long running jobs, and our prefetch count is N then, only N jobs will in > active mode. As we are run this long-running job in the async way, we can > handle more long running jobs than N. Therefore workers resources are > underutilized. > > 2. Even though we can easily deal with recovery requirement with work > stealing, it is not easy to handle cancel experiments. When this cancel > experiment comes the worker who works on this experiment should act > immediately. To add this behavior we need to introduce priority queues and no > need say this will add extra layer of complexity. Currently, we use zookeeper > to trigger cancel requests ( Another downside, we are using both zookeeper > and rabbitmq to solve different parts of Distributed systems issues. Almost > all latest Distributed system frameworks have being used zookeeper to manage > distributed system problems, we need to strongly consider using zookeeper as > a way of managing our components and share the load according to the resource > available in workers) > > 3. Putting email to a queue is not a good solution with commodity servers > where system failures are expected. This email queue is critical, if we > missed one of the statuses of a job then this job can go to the unknown state > or hang in the old status forever. Due to this, we have serious scalability > issue with GFac at the moment due to a bottleneck of email monitoring. > > I think we need to re-evaluate Airavata architecture and find a good yet > simple solution based on requirements. The new architecture should handle all > existing issues and able to extend future requirement. > > > [1] > http://www.mariuszwojcik.com/blog/How-to-choose-prefetch-count-value-for-RabbitMQ > > <http://www.mariuszwojcik.com/blog/How-to-choose-prefetch-count-value-for-RabbitMQ> > > -- > Shameera Rathnayaka > > -- > Shameera Rathnayaka > -- > Shameera Rathnayaka > -- > Shameera Rathnayaka >