thinkharderdev commented on issue #132:
URL: https://github.com/apache/arrow-ballista/issues/132#issuecomment-1216491354

   > > How would you support all-at-once scheduling in this setup for streaming 
execution?
   > 
   > Do you mean long running tasks for streaming case? Would you submit 
hundreds of SQLs or jobs to the scheduler for the streaming case? If not, I 
think single active scheduler will satisfy your needs. Then you don't need 
active multiple schedulers, and for single active scheduler you don't need to 
save every update to the backend.
   > 
   > For task scheduling, firstly we should refine the scheduling policy for 
each executor by employing round-robin way as previous code did. Then for one 
scheduler, it will evenly schedule its responsible tasks to the executors. 
Here, we can regard every executor's task slots as a hint to achieve to even 
task scheduling. Then it will also be fair for multiple schedulers to assign 
tasks to the executors, for both interactive tasks and long running tasks. And 
it's the executor who knows better its resources and task running situations. 
And it's better for the executor to decide which task to run or to pend.
   > 
   > For #59, I think it will work well when there are few tasks and few tasks 
update. However, when there are thousands of tasks for a job, even a stage, the 
lock and serialization and deserialization cost would be huge. That's why we 
added memory cache for the scheduler state previously. However, the cache layer 
has been removed by #59.
   
   By streaming I just meant scheduling the entire DAG at once so batches are 
streamed between stages instead of using a disk-based shuffle and full 
materialization of each stage. Infinite streaming is not really something we 
need or something Ballista plans to support as far as I know. 
   
   For all-at-once scheduling I don't think round-robin scheduling would work. 
The individual tasks would need to synchronize execution. So if, for example, a 
task from stage 2 is scheduled on one executor it would block its executor slot 
until its input partitions start executing on their executors. Under certain 
conditions I'm fairly certain this could deadlock the system. To avoid that you 
would have to implement some complicated synchronization logic on the executor 
side. 
   
   I agree that the current implementation involves too much overhead but I 
think there is a better way to iterate on the current design without radically 
changing things (and I believe limiting future improvements). Broadly I think 
the way we were planning to address this is to break the scheduler into two 
separate workloads
   
   1. A single leader which is responsible for task scheduling on the executors 
and can leverage in-memory state to alleviate distributed locking and IO 
concerns. Executors would report task updates to the coordinator which would 
handle all scheduling and asynchronously replicate scheduling state to the 
persistent backend. 
   2. N followers which would serve grpc requests (job status, execute query, 
etc) and handle physical planning, etc. 
   
   We would then of course use etcd for leader election in the distributed case 
(or have the single scheduler do both workloads in the standalone case). If the 
coordinator dies then we elect a new leader which would load the coordinator 
state from the persistent backend. 
   
   As an incremental step I think we could refine the locking. For simplicity 
sake, we use a global lock on the active jobs, but we could certainly refine 
that to use a lock only on a particular job (which is all that is really 
required for scheduling purposes). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to