In fact, I'm wondering if Spark will rerun the whole reduce ShuffleMapStage if its upstream ShuffleMapStage is INDETERMINATE and rerun.
Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道: > Thanks Erik for bringing up this question, I'm also curious about the > answer, any feedback is appreciated. > > Thanks, > Keyong Zhou > > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道: > >> Mridul, >> >> sure, I totally agree SPARK-25299 is a much better solution, as long as we >> can get it from spark community >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal, >> celeborn already has spark-integration code with [spark] scope) >> >> I also have a question about INDETERMINATE stage recompute, and may need >> your help >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I >> don't find related logic for INDETERMINATE ResultStage rerun in >> DAGScheduler >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the >> corresponding ResultStage should be entirely recomputed as well, per my >> understanding >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a >> ResultStage but it was not merged >> Do you know any context or related ticket for INDETERMINATE ResultStage >> rerun? >> >> Thanks in advance! >> >> Regards, >> Erik >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <mri...@gmail.com> >> wrote: >> >> > >> > >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote: >> > >> >> Hi Mridul, >> >> >> >> For a), >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() >> >> < >> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975 >> > >> >> to decide whether the whole stage needs to be recomputed >> >> I think we can pass the same information to Celeborn in >> >> ShuffleManager.registerShuffle() >> >> < >> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>, >> since >> >> RDD in ShuffleDependency contains the RDD object >> >> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, >> >> but luckily rdd is used internally >> >> >> >> def isIndeterminate: Boolean = { >> >> rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE >> >> } >> >> >> >> Relies on internal implementation is not good, but doable. >> >> I don't expect spark RDD/Stage implementation changes frequently, and >> we >> >> can discuss with Spark community for a RDD isIndeterminate API if they >> >> change it in the future >> >> >> > >> > >> > Only RDD.getOutputDeterministicLevel is publicly exposed, >> > RDD.outputDeterministicLevel is not and it is private[spark]. >> > While I dont expect changes to this, it is inherently unstable to depend >> > on it. >> > >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is >> > maintaining a reducer oriented view, you will need to recompute all the >> > mappers anyway - what you might save is the subset of reducer partitions >> > which can be skipped if it is DETERMINATE. >> > >> > >> > >> > >> >> >> >> for c) >> >> I also considered a similar solution in celeborn >> >> Celeborn (LifecycleManager) can get the full picture of remaining >> shuffle >> >> data from previous stage attempt and reuse it in stage recompute >> >> , and the whole process will be transparent to Spark/DagScheduler >> >> >> > >> > Celeborn does not have visibility into this - and this is potentially >> > subject to invasive changes in Apache Spark as it evolves. >> > For example, I recently merged a couple of changes which would make this >> > different in master compared to previous versions. >> > Until the remote shuffle service SPIP is implemented and these are >> > abstracted out & made pluggable, it will continue to be quite volatile. >> > >> > Note that the behavior for 3.5 and older is known - since Spark versions >> > have been released - it is the behavior in master and future versions of >> > Spark which is subject to change. >> > So delivering on SPARK-25299 would future proof all remote shuffle >> > implementations. >> > >> > >> > Regards, >> > Mridul >> > >> > >> > >> >> >> >> Per my perspective, leveraging partial stage recompute and >> >> remaining shuffle data needs a lot of work to do in Celeborn >> >> I prefer to implement a simple whole stage recompute first with >> interface >> >> defined with recomputeAll = true flag, and explore partial stage >> recompute >> >> in seperate ticket as future optimization >> >> How do you think about it? >> >> >> >> Regards, >> >> Erik >> >> >> >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com> >> >> wrote: >> >> >> >>> >> >>> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com >> > >> >>> wrote: >> >>> >> >>>> >> >>>> A reducer oriented view of shuffle, especially without replication, >> >>>> could indeed be susceptible to this issue you described (a single >> fetch >> >>>> failure would require all mappers to need to be recomputed) - note, >> not >> >>>> necessarily all reducers to be recomputed though. >> >>>> >> >>>> Note that I have not looked much into Celeborn specifically on this >> >>>> aspect yet, so my comments are *fairly* centric to Spark internals >> :-) >> >>>> >> >>>> Regards, >> >>>> Mridul >> >>>> >> >>>> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> >> wrote: >> >>>> >> >>>>> Hello, >> >>>>> >> >>>>> (Sorry for sending the same message again.) >> >>>>> >> >>>>> From my understanding, the current implementation of Celeborn makes >> it >> >>>>> hard to find out which mapper should be re-executed when a >> partition cannot >> >>>>> be read, and we should re-execute all the mappers in the upstream >> stage. If >> >>>>> we can find out which mapper/partition should be re-executed, the >> current >> >>>>> logic of stage recomputation could be (partially or totally) reused. >> >>>>> >> >>>>> Regards, >> >>>>> >> >>>>> --- Sungwoo >> >>>>> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan < >> mri...@gmail.com> >> >>>>> wrote: >> >>>>> >> >>>>>> >> >>>>>> Hi, >> >>>>>> >> >>>>>> Spark will try to minimize the recomputation cost as much as >> >>>>>> possible. >> >>>>>> For example, if parent stage was DETERMINATE, it simply needs to >> >>>>>> recompute the missing (mapper) partitions (which resulted in fetch >> >>>>>> failure). Note, this by itself could require further recomputation >> in the >> >>>>>> DAG if the inputs required to comput the parent partitions are >> missing, and >> >>>>>> so on - so it is dynamic. >> >>>>>> >> >>>>>> Regards, >> >>>>>> Mridul >> >>>>>> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park < >> o...@pl.postech.ac.kr> >> >>>>>> wrote: >> >>>>>> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is >> >>>>>>> going to be >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output >> >>>>>>> will be >> >>>>>>> > discarded and it will be entirely recomputed (see here >> >>>>>>> > < >> >>>>>>> >> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 >> >>>>>>> > >> >>>>>>> > ). >> >>>>>>> >> >>>>>>> If a reducer (in a downstream stage) fails to read data, can we >> find >> >>>>>>> out >> >>>>>>> which tasks should recompute their output? From the previous >> >>>>>>> discussion, I >> >>>>>>> thought this was hard (in the current implementation), and we >> should >> >>>>>>> re-execute all tasks in the upstream stage. >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> >> >>>>>>> --- Sungwoo >> >>>>>>> >> >>>>>> >> >