Hi Mridul, For a), DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() <https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975> to decide whether the whole stage needs to be recomputed I think we can pass the same information to Celeborn in ShuffleManager.registerShuffle() <https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>, since RDD in ShuffleDependency contains the RDD object It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but luckily rdd is used internally
def isIndeterminate: Boolean = { rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE } Relies on internal implementation is not good, but doable. I don't expect spark RDD/Stage implementation changes frequently, and we can discuss with Spark community for a RDD isIndeterminate API if they change it in the future for c) I also considered a similar solution in celeborn Celeborn (LifecycleManager) can get the full picture of remaining shuffle data from previous stage attempt and reuse it in stage recompute , and the whole process will be transparent to Spark/DagScheduler Per my perspective, leveraging partial stage recompute and remaining shuffle data needs a lot of work to do in Celeborn I prefer to implement a simple whole stage recompute first with interface defined with recomputeAll = true flag, and explore partial stage recompute in seperate ticket as future optimization How do you think about it? Regards, Erik On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com> wrote: > > > On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com> > wrote: > >> >> A reducer oriented view of shuffle, especially without replication, could >> indeed be susceptible to this issue you described (a single fetch failure >> would require all mappers to need to be recomputed) - note, not necessarily >> all reducers to be recomputed though. >> >> Note that I have not looked much into Celeborn specifically on this >> aspect yet, so my comments are *fairly* centric to Spark internals :-) >> >> Regards, >> Mridul >> >> >> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> wrote: >> >>> Hello, >>> >>> (Sorry for sending the same message again.) >>> >>> From my understanding, the current implementation of Celeborn makes it >>> hard to find out which mapper should be re-executed when a partition cannot >>> be read, and we should re-execute all the mappers in the upstream stage. If >>> we can find out which mapper/partition should be re-executed, the current >>> logic of stage recomputation could be (partially or totally) reused. >>> >>> Regards, >>> >>> --- Sungwoo >>> >>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <mri...@gmail.com> >>> wrote: >>> >>>> >>>> Hi, >>>> >>>> Spark will try to minimize the recomputation cost as much as possible. >>>> For example, if parent stage was DETERMINATE, it simply needs to >>>> recompute the missing (mapper) partitions (which resulted in fetch >>>> failure). Note, this by itself could require further recomputation in the >>>> DAG if the inputs required to comput the parent partitions are missing, and >>>> so on - so it is dynamic. >>>> >>>> Regards, >>>> Mridul >>>> >>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr> >>>> wrote: >>>> >>>>> > a) If one or more tasks for a stage (and so its shuffle id) is going >>>>> to be >>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output will >>>>> be >>>>> > discarded and it will be entirely recomputed (see here >>>>> > < >>>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 >>>>> > >>>>> > ). >>>>> >>>>> If a reducer (in a downstream stage) fails to read data, can we find >>>>> out >>>>> which tasks should recompute their output? From the previous >>>>> discussion, I >>>>> thought this was hard (in the current implementation), and we should >>>>> re-execute all tasks in the upstream stage. >>>>> >>>>> Thanks, >>>>> >>>>> --- Sungwoo >>>>> >>>>