Just a quick top post. This is informative discussion, please continue :)

I agree on that Airavata does not do Work Stealing but it implements "Work 
Queues”. Conceptually they are similar to the OS Kernel level work queens, but 
more in a distributed context - 
https://www.kernel.org/doc/Documentation/workqueue.txt 
<https://www.kernel.org/doc/Documentation/workqueue.txt>

Suresh
 

> On Oct 6, 2016, at 3:52 PM, Amila Jayasekara <thejaka.am...@gmail.com> wrote:
> 
> 
> 
> On Thu, Oct 6, 2016 at 3:17 PM, Shameera Rathnayaka <shameerai...@gmail.com 
> <mailto:shameerai...@gmail.com>> wrote:
> 
> 
> On Thu, Oct 6, 2016 at 2:50 PM Amila Jayasekara <thejaka.am...@gmail.com 
> <mailto:thejaka.am...@gmail.com>> wrote:
> On Thu, Oct 6, 2016 at 11:07 AM, Shameera Rathnayaka <shameerai...@gmail.com 
> <mailto:shameerai...@gmail.com>> wrote:
> Hi Amila, 
> 
> -- Please explain how you used "work stealing" in distributed system. That 
> would be interesting.
> 
> Airavata depends on work stealing + amqp for followings, 
> Fault Tolerance - This is one of major distributed system problem which 
> critical in Airavata, What ever the reason experiment request processing 
> shouldn't get any effect  from internal node failure. Even with the node 
> failures, Airavata should be capable enough to continue experiment request 
> processing or hold it until at least one node appear and then continue. How 
> this is handled in Ariavata is, worker only ack for messages only after it 
> completely processed it. If the node goes down without sendings  ack for the 
> messages it was processing,then rabbitmq put all these un-ack messages back 
> to the queue and available to consume again.
> 
> Resource Utilization- Another important goal of distributed system to 
> effectively use available resources in the system, namely the memory and 
> processors of components.  In Airavata this will decide the throughput and 
> response time of experiments. Currently, at a given time workers only get 
> messages up to a preconfigured limit (the limit is prefetch count) But most 
> of these jobs are async jobs. That means after worker gets fixed amount of 
> jobs, it won't get any other jobs even worker capable or handling more jobs, 
> waste of worker resources.
> 
> 
> You still did not answer my question. I want to know how you used "work 
> stealing" in your implementation. In other words how distributed work 
> stealing works in your implementation. The details  you gave above is 
> unrelated and does not answer my question. 
> 
> I think I have explained, how we use work stealing (work queues). If you are 
> finding a more analog solution to parallel computing work strealing then that 
> is hard to explain.  
> 
> No, you have not. :-). 
> Work stealing != work queues. In a distributed setting I would image 
> following kind of a work stealing implementation; Every worker (orchestrator) 
> maintains a request queue locally and it serve requests coming to the local 
> queue. Whenever one worker runs out of more requests to serve it will query 
> other distributed workers local queues to see whether there are requests that 
> it can serve. If there are it can steal requests from other workers local 
> queues and process. However, this model of computation is in efficient to do 
> in a distributed environment. I guess that is the same reason we dont find 
> much distributed work stealing implementations. 
> 
> Anyhow lets stop the discussion about work stealing now. :-)
>  
> 
>  
>  
> 
> -- I dont see AMQP in the architecture diagram you attached above and I dont 
> understand why Airavata has to depend on it. One way to figure this out is 
> think about the architecture without AMQP and figure out what actually should 
> happend and look for a way to do that using AMQP. 
> 
> Worker Queue is AMQP queue. 
> 
> Does the worker queue needs to be an AMQP queue ? Sorry, I dont know much 
> about AMQP but it sounds like limitations you are explaining are because of 
> AMQP.
> 
> It is not, but good to use well-defined protocol instead of custom one. 
> Almost all messaging systems have implemented AMQP protocol.
> 
> Can we figure out whether others have also encountered the same/similar 
> problem and how they tackled those with AMQP ? Cos the design we have is 
> pretty straightforward and I believe there are systems analogous to our 
> design that uses AMQP. 
>  
>  
>  
> -- Does this mean that you have a waiting thread or process within Airavata 
> after submitting the job (for each work) ? 
> 
> No, once the job is submitted to the remote resource, thread goes back to the 
> thread pool. 
> 
> Then, your previous explanation, (i.e., "The time needs for a worker to 
> finish the work is depend on the application run time (applications runs on 
> HPC machine). Theoretically, this can be from few sec to days or even 
> more."), invalidates. Correct ?
> 
> No, it is still valid, thread goes to thread pool doesn't say worker is 
> complete that request, it is waiting until actual hpc job runs on target 
> computer resoruces. After this hpc jobs completed then outptu data staging 
> happens. After output stage to storage then it ack to the work queue message.
> 
> This is confusing to me.
> Does this mean once you return thread to thread pool, it is not reusable for 
> another request ? Also, how do you wait on a thread after returning it to the 
> thread pool ? 
> Also, why do you have to wait for HPC job to complete ? I was under the 
> impression the communication is asynchronous. i.e. after job completes you 
> get an email confirmation and then you start output data staging in a 
> separate thread.
> 
> We should probably meet and verbally discuss this.
> 
> -AJ
>  
> 
> Thanks, 
> Shameera.
>  
>  
> 
> Thanks, 
> Shameera.
>  
> 
> It takes more time for me to digest following right now. I will try to give 
> more feedback when I properly understand them.
> 
> Thanks
> -Amila 
> 
>  
> That means, If a worker can read N(=prefetch count) number of messages from a 
> queue without sending acknowledgment then that is the limit one worker can 
> handle for given time. But most of this long-running jobs are asynchronous. 
> Worker resources are free to handle more works than N. Hence Airavata 
> underutilized worker resources. In the case of small jobs (small in runtime), 
> this won't be a big problem. 
> 
> Apache Zookeeper[2] provide a way to manage distributed system components and 
> most of  the latest distributed systems have been used Zookeeper to address 
> all the common distributed system problems like HA, FT, Leader Election etc 
> ... But in Airavata is trying to replace Rabbitmq with Zookeeper to achieve 
> the same outcomes, I haven't seen any framework have done it. How Airavata 
> tries to do is using Work Stealing Queues. Anyway, Airavata hasn't move 
> zookeeper out of its architecture yet as it uses zookeeper to handle cancel 
> requests. 
>  
> 
> Regarding 3 => Well.. email monitoring was alway problematic. More precisely 
> monitoring is problematic IMO. To handle monitoring I think we should be able 
> to get better feedback from job schedulers but as per my experience, even 
> those job schedulers are unreliable. Until we have a better way to get 
> feedback from job scheduler, monitoring is going to be challenging. However, 
> I don't understand why you have "serious scalability" issues in GFac because 
> of this. 
> 
> Let me explain more about this, I have used some terms comes with latest AMQP 
> spec here. Let's say we have two GFac Workers in the system. and both submit 
> jobs to Computer Resources and waiting for status update via emails (We 
> finally decided to depend on emails for a while come up with more robust 
> monitoring solution) when emails come, there is email monitoring server which 
> reads this emails and put it to a rabbitmq exchange[3].  Then each gfac 
> worker has subscribed to this exchange to get all email updates. Because 
> Framework doesn't know which worker handle particular jobs, it sends this 
> email content to all workers who subscribe to that exchange. There are few 
> issues in this way. 
> 
> 1. what if one of the workers goes offline for a moment? It should receive 
> the messages from where he left the queue. To do that we need to use 
> persistence queue which doesn't remove when consumer disconnect. And this 
> queue should receive all email updates messages as well during the consumer 
> down time. To create a persistent queue and reconnect again to the same 
> queue, the consumer should know about the queue name. Ok, let's say worker 
> create this queue  with a name in the very first time it joins to email 
> monitoring exchange. Now this problem is solved see the second issue.
> 
> 2. What if one worker node goes down and we start a different node ( runs in 
> different Machine/VM) . Now there is no way this new worker knows the queue 
> name created by the previous worker unless we configure it which is not a 
> very good solution where we have a pool of workers and this pool getting 
> changes time to time. Now the real problem is all the jobs handled by down 
> node is getting to this new node but there is no way it gets previous email 
> monitoring messages. which make these jobs hanging on it previous state 
> forever. Even the previously down worker comes up this might not get previous 
> jobs instead it retrieves a new set of jobs. This means we can't scale Gfac 
> workers independently. Hope this will explain the issue.
>  
> 
> In summary, to me, none of these are concerns for the architecture at the 
> moment. Also, you cannot just naively complain an architecture is "not good". 
> Architecture has to be compared with another design and evaluate and 
> pros/cons for both. I suggest we first try to improve the existing design to 
> handle issues you are pointing.
> 
> 
> It would be great to get some solution for above issues. IMHO, we have 
> overcomplicated Airavata design with Work Stealing approach, which is not 
> suitable for Airavata use cases and requirement. 
> 
> Thank you for your feedbacks, hope I answered to all of your concerns, 
> 
> 
> [1] http://www.rabbitmq.com/consumer-prefetch.html 
> <http://www.rabbitmq.com/consumer-prefetch.html> 
> [2] https://zookeeper.apache.org <https://zookeeper.apache.org/> 
> [3] https://www.rabbitmq.com/tutorials/tutorial-two-python.html 
> <https://www.rabbitmq.com/tutorials/tutorial-two-python.html> 
> 
> Thanks, 
> Shameera.
> 
> [2] https://en.wikipedia.org/wiki/Work_stealing 
> <https://en.wikipedia.org/wiki/Work_stealing>
> 
> Thanks
> -Amila
> 
> 
> On Tue, Oct 4, 2016 at 1:07 PM, Shameera Rathnayaka <shameerai...@gmail.com 
> <mailto:shameerai...@gmail.com>> wrote:
> Hi Devs, 
> 
> Airavata has adopted to work stealing design pattern lately and use work 
> queue approach to distributing works among consumers. There are two work 
> queues in current Airavata architecture. One in middle of API Server and 
> Orchestrator and the second one in between Orchestrator and Gfac, Following 
> is very high-level Airavata architecture. 
> 
> 
> <Highleve Arachitecture- Airavata.png>
> Here are the issues we have with above architecture. 
> 
> 1. Low resource utilization in Workers (Gfac/Orchestrator).
> We have used AMPQ prefetch count to limit the number of requests served by a 
> Worker, which is not a good way to load balance in the heterogeneous 
> environment where different workers have different level of resources. And it 
> is recommended to keep this prefetch count minimum [1] and this is valid for 
> work stealing too. If we only have one worker and we have M ( > N) number of 
> long running jobs, and our prefetch count is N then, only N jobs will in 
> active mode. As we are run this long-running job in the async way, we can 
> handle more long running jobs than N. Therefore workers resources are 
> underutilized. 
> 
> 2. Even though we can easily deal with recovery requirement with work 
> stealing, it is not easy to handle cancel experiments. When this cancel 
> experiment comes the worker who works on this experiment should act 
> immediately. To add this behavior we need to introduce priority queues and no 
> need say this will add extra layer of complexity. Currently, we use zookeeper 
> to trigger cancel requests ( Another downside, we are using both zookeeper 
> and rabbitmq to solve different parts of Distributed systems issues. Almost 
> all latest Distributed system frameworks have being used zookeeper to manage 
> distributed system problems, we need to strongly consider using zookeeper  as 
> a way of managing our components and share the load according to the resource 
> available in workers)
> 
> 3. Putting email to a queue is not a good solution with commodity servers 
> where system failures are expected. This email queue is critical, if we 
> missed one of the statuses of a job then this job can go to the unknown state 
> or hang in the old status forever. Due to this, we have serious scalability 
> issue with GFac at the moment due to a bottleneck of email monitoring. 
> 
> I think we need to re-evaluate Airavata architecture and find a good yet 
> simple solution based on requirements. The new architecture should handle all 
> existing issues and able to extend future requirement.  
> 
> 
> [1] 
> http://www.mariuszwojcik.com/blog/How-to-choose-prefetch-count-value-for-RabbitMQ
>  
> <http://www.mariuszwojcik.com/blog/How-to-choose-prefetch-count-value-for-RabbitMQ>
>  
> -- 
> Shameera Rathnayaka
> 
> -- 
> Shameera Rathnayaka
> -- 
> Shameera Rathnayaka
> -- 
> Shameera Rathnayaka
> 

Reply via email to