Good question, and ResultStage is actually special cased in spark as its
output could have already been consumed (for example collect() to driver,
etc) - and so if it is one of the stages which needs to be rolled back, the
job is aborted.

To illustrate, see the following:
-- snip --

package org.apache.spark


import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.rdd.{DeterministicLevel, RDD}

class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends RDD[E](delegate) {

  override def compute(split: Partition, context: TaskContext): Iterator[E] = {
    delegate.compute(split, context)
  }

  override protected def getPartitions: Array[Partition] =
    delegate.partitions
}

class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
DelegatingRDD[E](delegate) {
  override def getOutputDeterministicLevel: DeterministicLevel.Value =
DeterministicLevel.INDETERMINATE
}

class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
DelegatingRDD[E](delegate) {
  override def compute(split: Partition, context: TaskContext): Iterator[E] = {
    val tc = TaskContext.get
    if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
tc.attemptNumber() == 0) {
      // Wait for all tasks to be done, then call exit
      Thread.sleep(5000)
      System.exit(-1)
    }
    delegate.compute(split, context)
  }
}

// Make sure test_output directory is deleted before running this.
//
object Test {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
    val sc = new SparkContext(conf)

    val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000,
20).map(v => (v, v)))
    val resultRdd = new FailingRDD(mapperRdd.groupByKey())
    resultRdd.saveAsTextFile("test_output")
  }
}

-- snip --



Here, the mapper stage has been forced to be INDETERMINATE.
In the reducer stage, the first attempt to compute partition 0 will
wait for a bit and then exit - since the master is a local-cluster,
this results in FetchFailure when the second attempt of partition 0
tries to fetch shuffle data.
When spark tries to regenerate parent shuffle output, it sees that the
parent is INDETERMINATE - and so fails the entire job.with the
message:
"
org.apache.spark.SparkException: Job aborted due to stage failure: A
shuffle map stage with indeterminate output was failed and retried.
However, Spark cannot rollback the ResultStage 1 to re-process the
input data, and has to fail this job. Please eliminate the
indeterminacy by checkpointing the RDD before repartition and try
again.
"

This is coming from here
<https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090>
- when rolling back stages, if spark determines that a ResultStage
needs to be rolled back due to loss of INDETERMINATE output, it will
fail the job.

Hope this clarifies.
Regards,
Mridul


On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> wrote:

> In fact, I'm wondering if Spark will rerun the whole reduce ShuffleMapStage
> if its upstream ShuffleMapStage is INDETERMINATE and rerun.
>
> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:
>
> > Thanks Erik for bringing up this question, I'm also curious about the
> > answer, any feedback is appreciated.
> >
> > Thanks,
> > Keyong Zhou
> >
> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
> >
> >> Mridul,
> >>
> >> sure, I totally agree SPARK-25299 is a much better solution, as long as
> we
> >> can get it from spark community
> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
> >> celeborn already has spark-integration code with  [spark] scope)
> >>
> >> I also have a question about INDETERMINATE stage recompute, and may need
> >> your help
> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable,
> however, I
> >> don't find related logic for INDETERMINATE ResultStage rerun in
> >> DAGScheduler
> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
> >> corresponding ResultStage should be entirely recomputed as well, per my
> >> understanding
> >>
> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a
> >> ResultStage but it was not merged
> >> Do you know any context or related ticket for INDETERMINATE ResultStage
> >> rerun?
> >>
> >> Thanks in advance!
> >>
> >> Regards,
> >> Erik
> >>
> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <mri...@gmail.com>
> >> wrote:
> >>
> >> >
> >> >
> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote:
> >> >
> >> >> Hi Mridul,
> >> >>
> >> >> For a),
> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> >> >> <
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
> >> >
> >> >> to decide whether the whole stage needs to be recomputed
> >> >> I think we can pass the same information to Celeborn in
> >> >> ShuffleManager.registerShuffle()
> >> >> <
> >>
> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39
> >,
> >> since
> >> >> RDD in ShuffleDependency contains the RDD object
> >> >> It seems Stage.isIndeterminate() is unreadable from
> ShuffleDependency,
> >> >> but luckily rdd is used internally
> >> >>
> >> >> def isIndeterminate: Boolean = {
> >> >>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
> >> >> }
> >> >>
> >> >> Relies on internal implementation is not good, but doable.
> >> >> I don't expect spark RDD/Stage implementation changes frequently, and
> >> we
> >> >> can discuss with Spark community for a RDD isIndeterminate API if
> they
> >> >> change it in the future
> >> >>
> >> >
> >> >
> >> > Only RDD.getOutputDeterministicLevel is publicly exposed,
> >> > RDD.outputDeterministicLevel is not and it is private[spark].
> >> > While I dont expect changes to this, it is inherently unstable to
> depend
> >> > on it.
> >> >
> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
> >> > maintaining a reducer oriented view, you will need to recompute all
> the
> >> > mappers anyway - what you might save is the subset of reducer
> partitions
> >> > which can be skipped if it is DETERMINATE.
> >> >
> >> >
> >> >
> >> >
> >> >>
> >> >> for c)
> >> >> I also considered a similar solution in celeborn
> >> >> Celeborn (LifecycleManager) can get the full picture of remaining
> >> shuffle
> >> >> data from previous stage attempt and reuse it in stage recompute
> >> >> , and the whole process will be transparent to Spark/DagScheduler
> >> >>
> >> >
> >> > Celeborn does not have visibility into this - and this is potentially
> >> > subject to invasive changes in Apache Spark as it evolves.
> >> > For example, I recently merged a couple of changes which would make
> this
> >> > different in master compared to previous versions.
> >> > Until the remote shuffle service SPIP is implemented and these are
> >> > abstracted out & made pluggable, it will continue to be quite
> volatile.
> >> >
> >> > Note that the behavior for 3.5 and older is known - since Spark
> versions
> >> > have been released - it is the behavior in master and future versions
> of
> >> > Spark which is subject to change.
> >> > So delivering on SPARK-25299 would future proof all remote shuffle
> >> > implementations.
> >> >
> >> >
> >> > Regards,
> >> > Mridul
> >> >
> >> >
> >> >
> >> >>
> >> >> Per my perspective, leveraging partial stage recompute and
> >> >> remaining shuffle data needs a lot of work to do in Celeborn
> >> >> I prefer to implement a simple whole stage recompute first with
> >> interface
> >> >> defined with recomputeAll = true flag, and explore partial stage
> >> recompute
> >> >> in seperate ticket as future optimization
> >> >> How do you think about it?
> >> >>
> >> >> Regards,
> >> >> Erik
> >> >>
> >> >>
> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <
> mri...@gmail.com>
> >> >> wrote:
> >> >>
> >> >>>
> >> >>>
> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <
> mri...@gmail.com
> >> >
> >> >>> wrote:
> >> >>>
> >> >>>>
> >> >>>> A reducer oriented view of shuffle, especially without replication,
> >> >>>> could indeed be susceptible to this issue you described (a single
> >> fetch
> >> >>>> failure would require all mappers to need to be recomputed) - note,
> >> not
> >> >>>> necessarily all reducers to be recomputed though.
> >> >>>>
> >> >>>> Note that I have not looked much into Celeborn specifically on this
> >> >>>> aspect yet, so my comments are *fairly* centric to Spark internals
> >> :-)
> >> >>>>
> >> >>>> Regards,
> >> >>>> Mridul
> >> >>>>
> >> >>>>
> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com>
> >> wrote:
> >> >>>>
> >> >>>>> Hello,
> >> >>>>>
> >> >>>>> (Sorry for sending the same message again.)
> >> >>>>>
> >> >>>>> From my understanding, the current implementation of Celeborn
> makes
> >> it
> >> >>>>> hard to find out which mapper should be re-executed when a
> >> partition cannot
> >> >>>>> be read, and we should re-execute all the mappers in the upstream
> >> stage. If
> >> >>>>> we can find out which mapper/partition should be re-executed, the
> >> current
> >> >>>>> logic of stage recomputation could be (partially or totally)
> reused.
> >> >>>>>
> >> >>>>> Regards,
> >> >>>>>
> >> >>>>> --- Sungwoo
> >> >>>>>
> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
> >> mri...@gmail.com>
> >> >>>>> wrote:
> >> >>>>>
> >> >>>>>>
> >> >>>>>> Hi,
> >> >>>>>>
> >> >>>>>>   Spark will try to minimize the recomputation cost as much as
> >> >>>>>> possible.
> >> >>>>>> For example, if parent stage was DETERMINATE, it simply needs to
> >> >>>>>> recompute the missing (mapper) partitions (which resulted in
> fetch
> >> >>>>>> failure). Note, this by itself could require further
> recomputation
> >> in the
> >> >>>>>> DAG if the inputs required to comput the parent partitions are
> >> missing, and
> >> >>>>>> so on - so it is dynamic.
> >> >>>>>>
> >> >>>>>> Regards,
> >> >>>>>> Mridul
> >> >>>>>>
> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
> >> o...@pl.postech.ac.kr>
> >> >>>>>> wrote:
> >> >>>>>>
> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is
> >> >>>>>>> going to be
> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle
> output
> >> >>>>>>> will be
> >> >>>>>>> > discarded and it will be entirely recomputed (see here
> >> >>>>>>> > <
> >> >>>>>>>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >> >>>>>>> >
> >> >>>>>>> > ).
> >> >>>>>>>
> >> >>>>>>> If a reducer (in a downstream stage) fails to read data, can we
> >> find
> >> >>>>>>> out
> >> >>>>>>> which tasks should recompute their output? From the previous
> >> >>>>>>> discussion, I
> >> >>>>>>> thought this was hard (in the current implementation), and we
> >> should
> >> >>>>>>> re-execute all tasks in the upstream stage.
> >> >>>>>>>
> >> >>>>>>> Thanks,
> >> >>>>>>>
> >> >>>>>>> --- Sungwoo
> >> >>>>>>>
> >> >>>>>>
> >>
> >
>

Reply via email to