Good question, and ResultStage is actually special cased in spark as its output could have already been consumed (for example collect() to driver, etc) - and so if it is one of the stages which needs to be rolled back, the job is aborted.
To illustrate, see the following: -- snip -- package org.apache.spark import scala.reflect.ClassTag import org.apache.spark._ import org.apache.spark.rdd.{DeterministicLevel, RDD} class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends RDD[E](delegate) { override def compute(split: Partition, context: TaskContext): Iterator[E] = { delegate.compute(split, context) } override protected def getPartitions: Array[Partition] = delegate.partitions } class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends DelegatingRDD[E](delegate) { override def getOutputDeterministicLevel: DeterministicLevel.Value = DeterministicLevel.INDETERMINATE } class FailingRDD[E: ClassTag](delegate: RDD[E]) extends DelegatingRDD[E](delegate) { override def compute(split: Partition, context: TaskContext): Iterator[E] = { val tc = TaskContext.get if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 && tc.attemptNumber() == 0) { // Wait for all tasks to be done, then call exit Thread.sleep(5000) System.exit(-1) } delegate.compute(split, context) } } // Make sure test_output directory is deleted before running this. // object Test { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local-cluster[4,1,1024]") val sc = new SparkContext(conf) val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000, 20).map(v => (v, v))) val resultRdd = new FailingRDD(mapperRdd.groupByKey()) resultRdd.saveAsTextFile("test_output") } } -- snip -- Here, the mapper stage has been forced to be INDETERMINATE. In the reducer stage, the first attempt to compute partition 0 will wait for a bit and then exit - since the master is a local-cluster, this results in FetchFailure when the second attempt of partition 0 tries to fetch shuffle data. When spark tries to regenerate parent shuffle output, it sees that the parent is INDETERMINATE - and so fails the entire job.with the message: " org.apache.spark.SparkException: Job aborted due to stage failure: A shuffle map stage with indeterminate output was failed and retried. However, Spark cannot rollback the ResultStage 1 to re-process the input data, and has to fail this job. Please eliminate the indeterminacy by checkpointing the RDD before repartition and try again. " This is coming from here <https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090> - when rolling back stages, if spark determines that a ResultStage needs to be rolled back due to loss of INDETERMINATE output, it will fail the job. Hope this clarifies. Regards, Mridul On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> wrote: > In fact, I'm wondering if Spark will rerun the whole reduce ShuffleMapStage > if its upstream ShuffleMapStage is INDETERMINATE and rerun. > > Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道: > > > Thanks Erik for bringing up this question, I'm also curious about the > > answer, any feedback is appreciated. > > > > Thanks, > > Keyong Zhou > > > > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道: > > > >> Mridul, > >> > >> sure, I totally agree SPARK-25299 is a much better solution, as long as > we > >> can get it from spark community > >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal, > >> celeborn already has spark-integration code with [spark] scope) > >> > >> I also have a question about INDETERMINATE stage recompute, and may need > >> your help > >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, > however, I > >> don't find related logic for INDETERMINATE ResultStage rerun in > >> DAGScheduler > >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the > >> corresponding ResultStage should be entirely recomputed as well, per my > >> understanding > >> > >> I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a > >> ResultStage but it was not merged > >> Do you know any context or related ticket for INDETERMINATE ResultStage > >> rerun? > >> > >> Thanks in advance! > >> > >> Regards, > >> Erik > >> > >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <mri...@gmail.com> > >> wrote: > >> > >> > > >> > > >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote: > >> > > >> >> Hi Mridul, > >> >> > >> >> For a), > >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() > >> >> < > >> > https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975 > >> > > >> >> to decide whether the whole stage needs to be recomputed > >> >> I think we can pass the same information to Celeborn in > >> >> ShuffleManager.registerShuffle() > >> >> < > >> > https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39 > >, > >> since > >> >> RDD in ShuffleDependency contains the RDD object > >> >> It seems Stage.isIndeterminate() is unreadable from > ShuffleDependency, > >> >> but luckily rdd is used internally > >> >> > >> >> def isIndeterminate: Boolean = { > >> >> rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE > >> >> } > >> >> > >> >> Relies on internal implementation is not good, but doable. > >> >> I don't expect spark RDD/Stage implementation changes frequently, and > >> we > >> >> can discuss with Spark community for a RDD isIndeterminate API if > they > >> >> change it in the future > >> >> > >> > > >> > > >> > Only RDD.getOutputDeterministicLevel is publicly exposed, > >> > RDD.outputDeterministicLevel is not and it is private[spark]. > >> > While I dont expect changes to this, it is inherently unstable to > depend > >> > on it. > >> > > >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is > >> > maintaining a reducer oriented view, you will need to recompute all > the > >> > mappers anyway - what you might save is the subset of reducer > partitions > >> > which can be skipped if it is DETERMINATE. > >> > > >> > > >> > > >> > > >> >> > >> >> for c) > >> >> I also considered a similar solution in celeborn > >> >> Celeborn (LifecycleManager) can get the full picture of remaining > >> shuffle > >> >> data from previous stage attempt and reuse it in stage recompute > >> >> , and the whole process will be transparent to Spark/DagScheduler > >> >> > >> > > >> > Celeborn does not have visibility into this - and this is potentially > >> > subject to invasive changes in Apache Spark as it evolves. > >> > For example, I recently merged a couple of changes which would make > this > >> > different in master compared to previous versions. > >> > Until the remote shuffle service SPIP is implemented and these are > >> > abstracted out & made pluggable, it will continue to be quite > volatile. > >> > > >> > Note that the behavior for 3.5 and older is known - since Spark > versions > >> > have been released - it is the behavior in master and future versions > of > >> > Spark which is subject to change. > >> > So delivering on SPARK-25299 would future proof all remote shuffle > >> > implementations. > >> > > >> > > >> > Regards, > >> > Mridul > >> > > >> > > >> > > >> >> > >> >> Per my perspective, leveraging partial stage recompute and > >> >> remaining shuffle data needs a lot of work to do in Celeborn > >> >> I prefer to implement a simple whole stage recompute first with > >> interface > >> >> defined with recomputeAll = true flag, and explore partial stage > >> recompute > >> >> in seperate ticket as future optimization > >> >> How do you think about it? > >> >> > >> >> Regards, > >> >> Erik > >> >> > >> >> > >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan < > mri...@gmail.com> > >> >> wrote: > >> >> > >> >>> > >> >>> > >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan < > mri...@gmail.com > >> > > >> >>> wrote: > >> >>> > >> >>>> > >> >>>> A reducer oriented view of shuffle, especially without replication, > >> >>>> could indeed be susceptible to this issue you described (a single > >> fetch > >> >>>> failure would require all mappers to need to be recomputed) - note, > >> not > >> >>>> necessarily all reducers to be recomputed though. > >> >>>> > >> >>>> Note that I have not looked much into Celeborn specifically on this > >> >>>> aspect yet, so my comments are *fairly* centric to Spark internals > >> :-) > >> >>>> > >> >>>> Regards, > >> >>>> Mridul > >> >>>> > >> >>>> > >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> > >> wrote: > >> >>>> > >> >>>>> Hello, > >> >>>>> > >> >>>>> (Sorry for sending the same message again.) > >> >>>>> > >> >>>>> From my understanding, the current implementation of Celeborn > makes > >> it > >> >>>>> hard to find out which mapper should be re-executed when a > >> partition cannot > >> >>>>> be read, and we should re-execute all the mappers in the upstream > >> stage. If > >> >>>>> we can find out which mapper/partition should be re-executed, the > >> current > >> >>>>> logic of stage recomputation could be (partially or totally) > reused. > >> >>>>> > >> >>>>> Regards, > >> >>>>> > >> >>>>> --- Sungwoo > >> >>>>> > >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan < > >> mri...@gmail.com> > >> >>>>> wrote: > >> >>>>> > >> >>>>>> > >> >>>>>> Hi, > >> >>>>>> > >> >>>>>> Spark will try to minimize the recomputation cost as much as > >> >>>>>> possible. > >> >>>>>> For example, if parent stage was DETERMINATE, it simply needs to > >> >>>>>> recompute the missing (mapper) partitions (which resulted in > fetch > >> >>>>>> failure). Note, this by itself could require further > recomputation > >> in the > >> >>>>>> DAG if the inputs required to comput the parent partitions are > >> missing, and > >> >>>>>> so on - so it is dynamic. > >> >>>>>> > >> >>>>>> Regards, > >> >>>>>> Mridul > >> >>>>>> > >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park < > >> o...@pl.postech.ac.kr> > >> >>>>>> wrote: > >> >>>>>> > >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is > >> >>>>>>> going to be > >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle > output > >> >>>>>>> will be > >> >>>>>>> > discarded and it will be entirely recomputed (see here > >> >>>>>>> > < > >> >>>>>>> > >> > https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 > >> >>>>>>> > > >> >>>>>>> > ). > >> >>>>>>> > >> >>>>>>> If a reducer (in a downstream stage) fails to read data, can we > >> find > >> >>>>>>> out > >> >>>>>>> which tasks should recompute their output? From the previous > >> >>>>>>> discussion, I > >> >>>>>>> thought this was hard (in the current implementation), and we > >> should > >> >>>>>>> re-execute all tasks in the upstream stage. > >> >>>>>>> > >> >>>>>>> Thanks, > >> >>>>>>> > >> >>>>>>> --- Sungwoo > >> >>>>>>> > >> >>>>>> > >> > > >