On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote:
> Hi Mridul, > > For a), > DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() > <https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975> > to decide whether the whole stage needs to be recomputed > I think we can pass the same information to Celeborn in > ShuffleManager.registerShuffle() > <https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>, > since > RDD in ShuffleDependency contains the RDD object > It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but > luckily rdd is used internally > > def isIndeterminate: Boolean = { > rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE > } > > Relies on internal implementation is not good, but doable. > I don't expect spark RDD/Stage implementation changes frequently, and we > can discuss with Spark community for a RDD isIndeterminate API if they > change it in the future > Only RDD.getOutputDeterministicLevel is publicly exposed, RDD.outputDeterministicLevel is not and it is private[spark]. While I dont expect changes to this, it is inherently unstable to depend on it. Btw, please see the discussion with Sungwoo Park, if Celeborn is maintaining a reducer oriented view, you will need to recompute all the mappers anyway - what you might save is the subset of reducer partitions which can be skipped if it is DETERMINATE. > > for c) > I also considered a similar solution in celeborn > Celeborn (LifecycleManager) can get the full picture of remaining shuffle > data from previous stage attempt and reuse it in stage recompute > , and the whole process will be transparent to Spark/DagScheduler > Celeborn does not have visibility into this - and this is potentially subject to invasive changes in Apache Spark as it evolves. For example, I recently merged a couple of changes which would make this different in master compared to previous versions. Until the remote shuffle service SPIP is implemented and these are abstracted out & made pluggable, it will continue to be quite volatile. Note that the behavior for 3.5 and older is known - since Spark versions have been released - it is the behavior in master and future versions of Spark which is subject to change. So delivering on SPARK-25299 would future proof all remote shuffle implementations. Regards, Mridul > > Per my perspective, leveraging partial stage recompute and > remaining shuffle data needs a lot of work to do in Celeborn > I prefer to implement a simple whole stage recompute first with interface > defined with recomputeAll = true flag, and explore partial stage recompute > in seperate ticket as future optimization > How do you think about it? > > Regards, > Erik > > > On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com> > wrote: > >> >> >> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com> >> wrote: >> >>> >>> A reducer oriented view of shuffle, especially without replication, >>> could indeed be susceptible to this issue you described (a single fetch >>> failure would require all mappers to need to be recomputed) - note, not >>> necessarily all reducers to be recomputed though. >>> >>> Note that I have not looked much into Celeborn specifically on this >>> aspect yet, so my comments are *fairly* centric to Spark internals :-) >>> >>> Regards, >>> Mridul >>> >>> >>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> (Sorry for sending the same message again.) >>>> >>>> From my understanding, the current implementation of Celeborn makes it >>>> hard to find out which mapper should be re-executed when a partition cannot >>>> be read, and we should re-execute all the mappers in the upstream stage. If >>>> we can find out which mapper/partition should be re-executed, the current >>>> logic of stage recomputation could be (partially or totally) reused. >>>> >>>> Regards, >>>> >>>> --- Sungwoo >>>> >>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <mri...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Hi, >>>>> >>>>> Spark will try to minimize the recomputation cost as much as >>>>> possible. >>>>> For example, if parent stage was DETERMINATE, it simply needs to >>>>> recompute the missing (mapper) partitions (which resulted in fetch >>>>> failure). Note, this by itself could require further recomputation in the >>>>> DAG if the inputs required to comput the parent partitions are missing, >>>>> and >>>>> so on - so it is dynamic. >>>>> >>>>> Regards, >>>>> Mridul >>>>> >>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr> >>>>> wrote: >>>>> >>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is >>>>>> going to be >>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output >>>>>> will be >>>>>> > discarded and it will be entirely recomputed (see here >>>>>> > < >>>>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 >>>>>> > >>>>>> > ). >>>>>> >>>>>> If a reducer (in a downstream stage) fails to read data, can we find >>>>>> out >>>>>> which tasks should recompute their output? From the previous >>>>>> discussion, I >>>>>> thought this was hard (in the current implementation), and we should >>>>>> re-execute all tasks in the upstream stage. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> --- Sungwoo >>>>>> >>>>>