Mridul, sure, I totally agree SPARK-25299 is a much better solution, as long as we can get it from spark community (btw, private[spark] of RDD.outputDeterministicLevel is no big deal, celeborn already has spark-integration code with [spark] scope)
I also have a question about INDETERMINATE stage recompute, and may need your help The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I don't find related logic for INDETERMINATE ResultStage rerun in DAGScheduler If INDETERMINATE ShuffleMapStage got entirely recomputed, the corresponding ResultStage should be entirely recomputed as well, per my understanding I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a ResultStage but it was not merged Do you know any context or related ticket for INDETERMINATE ResultStage rerun? Thanks in advance! Regards, Erik On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <[email protected]> wrote: > > > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <[email protected]> wrote: > >> Hi Mridul, >> >> For a), >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() >> <https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975> >> to decide whether the whole stage needs to be recomputed >> I think we can pass the same information to Celeborn in >> ShuffleManager.registerShuffle() >> <https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>, >> since >> RDD in ShuffleDependency contains the RDD object >> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, >> but luckily rdd is used internally >> >> def isIndeterminate: Boolean = { >> rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE >> } >> >> Relies on internal implementation is not good, but doable. >> I don't expect spark RDD/Stage implementation changes frequently, and we >> can discuss with Spark community for a RDD isIndeterminate API if they >> change it in the future >> > > > Only RDD.getOutputDeterministicLevel is publicly exposed, > RDD.outputDeterministicLevel is not and it is private[spark]. > While I dont expect changes to this, it is inherently unstable to depend > on it. > > Btw, please see the discussion with Sungwoo Park, if Celeborn is > maintaining a reducer oriented view, you will need to recompute all the > mappers anyway - what you might save is the subset of reducer partitions > which can be skipped if it is DETERMINATE. > > > > >> >> for c) >> I also considered a similar solution in celeborn >> Celeborn (LifecycleManager) can get the full picture of remaining shuffle >> data from previous stage attempt and reuse it in stage recompute >> , and the whole process will be transparent to Spark/DagScheduler >> > > Celeborn does not have visibility into this - and this is potentially > subject to invasive changes in Apache Spark as it evolves. > For example, I recently merged a couple of changes which would make this > different in master compared to previous versions. > Until the remote shuffle service SPIP is implemented and these are > abstracted out & made pluggable, it will continue to be quite volatile. > > Note that the behavior for 3.5 and older is known - since Spark versions > have been released - it is the behavior in master and future versions of > Spark which is subject to change. > So delivering on SPARK-25299 would future proof all remote shuffle > implementations. > > > Regards, > Mridul > > > >> >> Per my perspective, leveraging partial stage recompute and >> remaining shuffle data needs a lot of work to do in Celeborn >> I prefer to implement a simple whole stage recompute first with interface >> defined with recomputeAll = true flag, and explore partial stage recompute >> in seperate ticket as future optimization >> How do you think about it? >> >> Regards, >> Erik >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <[email protected]> >> wrote: >> >>> >>> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <[email protected]> >>> wrote: >>> >>>> >>>> A reducer oriented view of shuffle, especially without replication, >>>> could indeed be susceptible to this issue you described (a single fetch >>>> failure would require all mappers to need to be recomputed) - note, not >>>> necessarily all reducers to be recomputed though. >>>> >>>> Note that I have not looked much into Celeborn specifically on this >>>> aspect yet, so my comments are *fairly* centric to Spark internals :-) >>>> >>>> Regards, >>>> Mridul >>>> >>>> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <[email protected]> wrote: >>>> >>>>> Hello, >>>>> >>>>> (Sorry for sending the same message again.) >>>>> >>>>> From my understanding, the current implementation of Celeborn makes it >>>>> hard to find out which mapper should be re-executed when a partition >>>>> cannot >>>>> be read, and we should re-execute all the mappers in the upstream stage. >>>>> If >>>>> we can find out which mapper/partition should be re-executed, the current >>>>> logic of stage recomputation could be (partially or totally) reused. >>>>> >>>>> Regards, >>>>> >>>>> --- Sungwoo >>>>> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> Spark will try to minimize the recomputation cost as much as >>>>>> possible. >>>>>> For example, if parent stage was DETERMINATE, it simply needs to >>>>>> recompute the missing (mapper) partitions (which resulted in fetch >>>>>> failure). Note, this by itself could require further recomputation in the >>>>>> DAG if the inputs required to comput the parent partitions are missing, >>>>>> and >>>>>> so on - so it is dynamic. >>>>>> >>>>>> Regards, >>>>>> Mridul >>>>>> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is >>>>>>> going to be >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output >>>>>>> will be >>>>>>> > discarded and it will be entirely recomputed (see here >>>>>>> > < >>>>>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 >>>>>>> > >>>>>>> > ). >>>>>>> >>>>>>> If a reducer (in a downstream stage) fails to read data, can we find >>>>>>> out >>>>>>> which tasks should recompute their output? From the previous >>>>>>> discussion, I >>>>>>> thought this was hard (in the current implementation), and we should >>>>>>> re-execute all tasks in the upstream stage. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> --- Sungwoo >>>>>>> >>>>>>
