[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569363#comment-16569363 ] Mridul Muralidharan commented on SPARK-24375: - {quote} We've thought hard on the issue and don't feel we can make it unless we force users to explicitly set a number in a barrier() call (actually it's not a good idea because it brings more borden to manage the code).{quote} I am not sure where the additional burden exists. Make it an optional param to barrier. * If not defined, it would be analogous to what exists right now. * If specified, fail the stage if different tasks in stage end up waiting on different barrier names (or some have a name and others dont). In example usecases I have seen, there is usually partition specific code paths (if partition 0, do some initialization/teardown, etc) - which results in divergent codepaths : and so increases potential for this issue. It will be very difficult to reason about the state what happens. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569202#comment-16569202 ] Jiang Xingbo commented on SPARK-24375: -- [~mridulm80] You are right that now we are not able to identify which barrier it is until we really executed the barrier() function. We've thought hard on the issue and don't feel we can make it unless we force users to explicitly set a number in a barrier() call (actually it's not a good idea because it brings more borden to manage the code). The current decision is that we don't distinguish barrier() calls from the same task, users shall be responsible to ensure the same number of barrier() calls shall happen in all possible code branches, otherwise you may get the job hanging or a SparkException after timeout. We've added the following message to the description of `BarrierTaskContext.barrier()`, I hope these can be useful: {quote} * CAUTION! In a barrier stage, each task must have the same number of barrier() calls, in all * possible code branches. Otherwise, you may get the job hanging or a SparkException after * timeout. {quote} > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568664#comment-16568664 ] Mridul Muralidharan commented on SPARK-24375: - {quote} It's not desired behavior to catch exception thrown by TaskContext.barrier() silently. However, in case this really happens, we can detect that because we have `epoch` both in driver side and executor side, more details will go to the design doc of BarrierTaskContext.barrier() SPARK-24581 {quote} The current 'barrier' function does not identify 'which' barrier it is from a user point of view. Here, due to exceptions raised (not necessarily from barrier(), but could be from user code as well), different tasks are waiting on different barriers. {code} try { ... snippet A ... // Barrier 1 context.barrier() ... snippet B ... } catch { ... } ... snippet C ... // Barrier 2 context.barrier() {code} T1 waits on barrier 1, T2 could have raised exception in snippet A and ends up waiting on Barrier 2 (having never seen Barrier 1). In this scenario, how is spark making progress ? (And ofcourse, when T1 reaches barrier 2, when T2 has moved past it). I did not see this clarified in the design or in the implementation. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16548784#comment-16548784 ] Jiang Xingbo commented on SPARK-24375: -- {quote}Is the 'barrier' logic pluggable ? Instead of only being a global sync point. {quote} The barrier() function is quite like [MPI_Barrier|https://www.mpich.org/static/docs/v3.2.1/www/www3/MPI_Barrier.html] function in MPI, the major purpose is to provide a way to do global sync between barrier tasks. I'm not sure whether we have plan to support pluggable logic for now, do you have a case in hand that require pluggable barrier() ? {quote}Dynamic resource allocation (dra) triggers allocation of additional resources based on pending tasks - hence the comment We may add a check of total available slots before scheduling tasks from a barrier stage taskset. does not necessarily work in that context. {quote} Support running barrier stage with dynamic resource allocation is a Non-Goal here, however, we can improve the behavior to integrate better with DRA in Spark 3.0 . {quote}Currently DRA in spark uniformly allocates resources - are we envisioning changes as part of this effort to allocate heterogenous executor resources based on pending tasks (atleast initially for barrier support for gpu's) ? {quote} There is another ongoing SPIP SPARK-24615 to add accelerator-aware task scheduling for Spark, I think we shall deal with the above issue within that topic. {quote}In face of exceptions, some tasks will wait on barrier 2 and others on barrier 1 : causing issues.{quote} It's not desired behavior to catch exception thrown by TaskContext.barrier() silently. However, in case this really happens, we can detect that because we have `epoch` both in driver side and executor side, more details will go to the design doc of BarrierTaskContext.barrier() SPARK-24581 {quote}Can you elaborate more on leveraging TaskContext.localProperties ? Is it expected to be sync'ed after 'barrier' returns ? What gaurantees are we expecting to provide ?{quote} We update the localProperties in driver and in executors you shall be able to fetch the updated values through TaskContext, it should not couple with `barrier()` function. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516411#comment-16516411 ] Mridul Muralidharan commented on SPARK-24375: - [~jiangxb1987] A couple of comments based on the document and your elaboration above: * Is the 'barrier' logic pluggable ? Instead of only being a global sync point. * Dynamic resource allocation (dra) triggers allocation of additional resources based on pending tasks - hence *We may add a check of total available slots before scheduling tasks from a barrier stage taskset.* does not necessarily work in that context. * Currently DRA in spark uniformly allocates resources - are we envisioning changes as part of this effort to allocate heterogenous executor resources based on pending tasks (atleast initially for barrier support for gpu's) ? * How is fault tolerance handled w.r.t waiting on incorrect barriers ? Any way to identify the barrier ? Example: {code} try { ... snippet A ... // Barrier 1 context.barrier() ... snippet B ... } catch { ... } ... snippet C ... // Barrier 2 context.barrier() {code} ** In face of exceptions, some tasks will wait on barrier 2 and others on barrier 1 : causing issues. * > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515913#comment-16515913 ] Xiangrui Meng commented on SPARK-24375: --- I'm closing this Jira in favor of formal design discussions at SPARK-24581 and SPARK-24582. Please watch those tickets and provide your inputs there. Thanks! > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16504171#comment-16504171 ] Li Yuanjian commented on SPARK-24375: - Got it, great thanks for your detailed explanation. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503647#comment-16503647 ] Jiang Xingbo commented on SPARK-24375: -- The major problem is that tasks in the same stage of a MPI workload may rely on the internal results of other parallel running folk tasks to compute the final results, thus when a task fail, other tasks in the same stage may generate incorrect result or even hang, and it seems to be straight-forward to just retry the whole stage on task failure. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503486#comment-16503486 ] Li Yuanjian commented on SPARK-24375: - Hi [~cloud_fan] and [~jiangxb1987], just I tiny question here, I notice the discussion in SPARK-20928 mentioned the barrier scheduling. {quote} A barrier stage doesn’t launch any of its tasks until the available slots(free CPU cores can be used to launch pending tasks) satisfies the target to launch all the tasks at the same time, and always retry the whole stage when any task(s) fail. {quote} Why the task level retrying was forbidden here, is there any possible to achieve this? Thanks. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501049#comment-16501049 ] Apache Spark commented on SPARK-24375: -- User 'jiangxb1987' has created a pull request for this issue: https://github.com/apache/spark/pull/21494 > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492745#comment-16492745 ] Wenchen Fan commented on SPARK-24375: - For the PySpark side, we don't need to care about the scheduler stuff, because PySpark driver connects to a JVM driver, and all the schedule stuff is done in the JVM driver. For the task barrier, one problem is that, we launch a Python worker per task, and the Python workers talk to the JVM executor via socket. It's hard to change the protocol and allow the Python worker to send a signal to the JVM executor to request a sync. We can set up a PY4J server per task, and the Python Worker can send the barrier sync request via PY4J. > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16489764#comment-16489764 ] Jiang Xingbo commented on SPARK-24375: -- We proposal to add new RDDBarrier and BarrierTaskContext to support barrier scheduling in Spark, it also requires to modify how the job scheduling works a bit to accommodate the new feature. *Barrier Stage*: A barrier stage doesn’t launch any of its tasks until the available slots(free CPU cores can be used to launch pending tasks) satisfies the target to launch all the tasks at the same time, and always retry the whole stage when any task(s) fail. One way to identify whether a stage is a barrier stage can be tracing the RDD that the stage runs on, if the stage contains RDDBarrier or at least one of the ancestor RDD(s) are RDDBarrier then the stage is a barrier stage, the tracing shall stop at ShuffleRDD(s). *Schedule Barrier Tasks*: Currently TaskScheduler schedule pending tasks on available slots by best effort, so normally all tasks in the same stage don’t get launched at the same time. We may add a check of total available slots before scheduling tasks from a barrier stage taskset. It is still possible that only partial tasks of a whole barrier stage taskset get launched due to task locality issues, so we have to check again before launch to ensure that all tasks in the same barrier stage get launched at the same time. If we consider scheduling several jobs at the same time(both barrier and regular jobs), it may be possible that barrier tasks are block by regular tasks(when available slots are always less than that required by a barrier stage taskset), or barrier stage taskset may block another barrier stage taskset(when a barrier stage taskset that requires less slots is prone to be scheduled earlier). Currently we don’t have a perfect solution for all these scenarios, but at least we may avoid the worst case that a huge barrier stage taskset being blocked forever on a busy cluster, using a time-based weight approach(conceptionally, a taskset that have been pending for a longer time will be assigned greater priority weight to be scheduled). *Task Barrier*: Barrier tasks shall allow users to insert sync in the middle of task execution, this can be achieved by introducing a glocal barrier operation in TaskContext, which makes the current task wait until all tasks in the same stage hit this barrier. *Task Failure*: To ensure correctness, a barrier stage always retry the whole stage when any task(s) fail. Thus, it’s quite straightforward that we shall require kill all the running tasks of a failed stage, and that also guarantees at most one taskset shall be running for each single stage(no zombie tasks). *Speculative Task*: Since we require launch all tasks in a barrier stage at the same time, there is no need to launch a speculative task for a barrier stage taskset. *Share TaskInfo*: To share informations between tasks in a barrier stage, we may update them in `TaskContext.localProperties`. *Python Support*: Expose RDDBarrier and BarrierTaskContext to pyspark. [~cloud_fan] maybe you want to give additional information I didn't cover above? (esp. PySpark) > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24375) Design sketch: support barrier scheduling in Apache Spark
[ https://issues.apache.org/jira/browse/SPARK-24375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488432#comment-16488432 ] Xiangrui Meng commented on SPARK-24375: --- [~jiangxb1987] Could you summarize the design sketch based on our offline discussion? Thanks! > Design sketch: support barrier scheduling in Apache Spark > - > > Key: SPARK-24375 > URL: https://issues.apache.org/jira/browse/SPARK-24375 > Project: Spark > Issue Type: Story > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Xiangrui Meng >Assignee: Jiang Xingbo >Priority: Major > > This task is to outline a design sketch for the barrier scheduling SPIP > discussion. It doesn't need to be a complete design before the vote. But it > should at least cover both Scala/Java and PySpark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org