thinkharderdev commented on issue #132: URL: https://github.com/apache/arrow-ballista/issues/132#issuecomment-1216491354
> > How would you support all-at-once scheduling in this setup for streaming execution? > > Do you mean long running tasks for streaming case? Would you submit hundreds of SQLs or jobs to the scheduler for the streaming case? If not, I think single active scheduler will satisfy your needs. Then you don't need active multiple schedulers, and for single active scheduler you don't need to save every update to the backend. > > For task scheduling, firstly we should refine the scheduling policy for each executor by employing round-robin way as previous code did. Then for one scheduler, it will evenly schedule its responsible tasks to the executors. Here, we can regard every executor's task slots as a hint to achieve to even task scheduling. Then it will also be fair for multiple schedulers to assign tasks to the executors, for both interactive tasks and long running tasks. And it's the executor who knows better its resources and task running situations. And it's better for the executor to decide which task to run or to pend. > > For #59, I think it will work well when there are few tasks and few tasks update. However, when there are thousands of tasks for a job, even a stage, the lock and serialization and deserialization cost would be huge. That's why we added memory cache for the scheduler state previously. However, the cache layer has been removed by #59. By streaming I just meant scheduling the entire DAG at once so batches are streamed between stages instead of using a disk-based shuffle and full materialization of each stage. Infinite streaming is not really something we need or something Ballista plans to support as far as I know. For all-at-once scheduling I don't think round-robin scheduling would work. The individual tasks would need to synchronize execution. So if, for example, a task from stage 2 is scheduled on one executor it would block its executor slot until its input partitions start executing on their executors. Under certain conditions I'm fairly certain this could deadlock the system. To avoid that you would have to implement some complicated synchronization logic on the executor side. I agree that the current implementation involves too much overhead but I think there is a better way to iterate on the current design without radically changing things (and I believe limiting future improvements). Broadly I think the way we were planning to address this is to break the scheduler into two separate workloads 1. A single leader which is responsible for task scheduling on the executors and can leverage in-memory state to alleviate distributed locking and IO concerns. Executors would report task updates to the coordinator which would handle all scheduling and asynchronously replicate scheduling state to the persistent backend. 2. N followers which would serve grpc requests (job status, execute query, etc) and handle physical planning, etc. We would then of course use etcd for leader election in the distributed case (or have the single scheduler do both workloads in the standalone case). If the coordinator dies then we elect a new leader which would load the coordinator state from the persistent backend. As an incremental step I think we could refine the locking. For simplicity sake, we use a global lock on the active jobs, but we could certainly refine that to use a lock only on a particular job (which is all that is really required for scheduling purposes). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
