Yes, DAGScheduler is dealing with it at a stage level - and so individual
RDD’s DeterministicLevel  would be handled in order to determine the
stage’s level.

Regards,
Mridul


On Fri, Nov 3, 2023 at 9:45 AM Keyong Zhou <zho...@apache.org> wrote:

> I checked RDD#getOutputDeterministicLevel and find that if an RDD's
> upstream is INDETERMINATE,
> then it's also INDETERMINATE.
>
> Thanks,
> Keyong Zhou
>
> Keyong Zhou <zho...@apache.org> 于2023年11月3日周五 19:57写道:
>
> > Hi Mridul,
> >
> > I still have a question. DAGScheduler#submitMissingTasks will
> > only unregisterAllMapAndMergeOutput
> > if the current ShuffleMapStage is Indeterminate. What if the current
> stage
> > is determinate, but its
> > upstream stage is Indeterminate, and its upstream stage is rerun?
> >
> > Thanks,
> > Keyong Zhou
> >
> > Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道:
> >
> >> To add my response - what I described (w.r.t failing job) applies only
> to
> >> ResultStage.
> >> It walks the lineage DAG to identify all indeterminate parents to
> >> rollback.
> >> If there are only ShuffleMapStages in the set of stages to rollback, it
> >> will simply discard their output, rollback all of them, and then retry
> >> these stages (same shuffle-id, a new stage attempt)
> >>
> >>
> >> Regards,
> >> Mridul
> >>
> >>
> >>
> >> On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com>
> >> wrote:
> >>
> >> >
> >> > Good question, and ResultStage is actually special cased in spark as
> its
> >> > output could have already been consumed (for example collect() to
> >> driver,
> >> > etc) - and so if it is one of the stages which needs to be rolled
> back,
> >> the
> >> > job is aborted.
> >> >
> >> > To illustrate, see the following:
> >> > -- snip --
> >> >
> >> > package org.apache.spark
> >> >
> >> >
> >> > import scala.reflect.ClassTag
> >> >
> >> > import org.apache.spark._
> >> > import org.apache.spark.rdd.{DeterministicLevel, RDD}
> >> >
> >> > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends
> >> RDD[E](delegate) {
> >> >
> >> >   override def compute(split: Partition, context: TaskContext):
> >> Iterator[E] = {
> >> >     delegate.compute(split, context)
> >> >   }
> >> >
> >> >   override protected def getPartitions: Array[Partition] =
> >> >     delegate.partitions
> >> > }
> >> >
> >> > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
> >> DelegatingRDD[E](delegate) {
> >> >   override def getOutputDeterministicLevel: DeterministicLevel.Value =
> >> DeterministicLevel.INDETERMINATE
> >> > }
> >> >
> >> > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
> >> DelegatingRDD[E](delegate) {
> >> >   override def compute(split: Partition, context: TaskContext):
> >> Iterator[E] = {
> >> >     val tc = TaskContext.get
> >> >     if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
> >> tc.attemptNumber() == 0) {
> >> >       // Wait for all tasks to be done, then call exit
> >> >       Thread.sleep(5000)
> >> >       System.exit(-1)
> >> >     }
> >> >     delegate.compute(split, context)
> >> >   }
> >> > }
> >> >
> >> > // Make sure test_output directory is deleted before running this.
> >> > //
> >> > object Test {
> >> >
> >> >   def main(args: Array[String]): Unit = {
> >> >     val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
> >> >     val sc = new SparkContext(conf)
> >> >
> >> >     val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000,
> >> 20).map(v => (v, v)))
> >> >     val resultRdd = new FailingRDD(mapperRdd.groupByKey())
> >> >     resultRdd.saveAsTextFile("test_output")
> >> >   }
> >> > }
> >> >
> >> > -- snip --
> >> >
> >> >
> >> >
> >> > Here, the mapper stage has been forced to be INDETERMINATE.
> >> > In the reducer stage, the first attempt to compute partition 0 will
> >> wait for a bit and then exit - since the master is a local-cluster, this
> >> results in FetchFailure when the second attempt of partition 0 tries to
> >> fetch shuffle data.
> >> > When spark tries to regenerate parent shuffle output, it sees that the
> >> parent is INDETERMINATE - and so fails the entire job.with the message:
> >> > "
> >> > org.apache.spark.SparkException: Job aborted due to stage failure: A
> >> shuffle map stage with indeterminate output was failed and retried.
> >> However, Spark cannot rollback the ResultStage 1 to re-process the input
> >> data, and has to fail this job. Please eliminate the indeterminacy by
> >> checkpointing the RDD before repartition and try again.
> >> > "
> >> >
> >> > This is coming from here <
> >>
> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090
> >
> >> - when rolling back stages, if spark determines that a ResultStage
> needs to
> >> be rolled back due to loss of INDETERMINATE output, it will fail the
> job.
> >> >
> >> > Hope this clarifies.
> >> > Regards,
> >> > Mridul
> >> >
> >> >
> >> > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org>
> wrote:
> >> >
> >> >> In fact, I'm wondering if Spark will rerun the whole reduce
> >> >> ShuffleMapStage
> >> >> if its upstream ShuffleMapStage is INDETERMINATE and rerun.
> >> >>
> >> >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:
> >> >>
> >> >> > Thanks Erik for bringing up this question, I'm also curious about
> the
> >> >> > answer, any feedback is appreciated.
> >> >> >
> >> >> > Thanks,
> >> >> > Keyong Zhou
> >> >> >
> >> >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
> >> >> >
> >> >> >> Mridul,
> >> >> >>
> >> >> >> sure, I totally agree SPARK-25299 is a much better solution, as
> long
> >> >> as we
> >> >> >> can get it from spark community
> >> >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big
> deal,
> >> >> >> celeborn already has spark-integration code with  [spark] scope)
> >> >> >>
> >> >> >> I also have a question about INDETERMINATE stage recompute, and
> may
> >> >> need
> >> >> >> your help
> >> >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable,
> >> >> however, I
> >> >> >> don't find related logic for INDETERMINATE ResultStage rerun in
> >> >> >> DAGScheduler
> >> >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
> >> >> >> corresponding ResultStage should be entirely recomputed as well,
> >> per my
> >> >> >> understanding
> >> >> >>
> >> >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to
> >> rollback
> >> >> a
> >> >> >> ResultStage but it was not merged
> >> >> >> Do you know any context or related ticket for INDETERMINATE
> >> ResultStage
> >> >> >> rerun?
> >> >> >>
> >> >> >> Thanks in advance!
> >> >> >>
> >> >> >> Regards,
> >> >> >> Erik
> >> >> >>
> >> >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <
> >> mri...@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >> >
> >> >> >> >
> >> >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com>
> >> wrote:
> >> >> >> >
> >> >> >> >> Hi Mridul,
> >> >> >> >>
> >> >> >> >> For a),
> >> >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> >> >> >> >> <
> >> >> >>
> >> >>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
> >> >> >> >
> >> >> >> >> to decide whether the whole stage needs to be recomputed
> >> >> >> >> I think we can pass the same information to Celeborn in
> >> >> >> >> ShuffleManager.registerShuffle()
> >> >> >> >> <
> >> >> >>
> >> >>
> >>
> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39
> >> >> >,
> >> >> >> since
> >> >> >> >> RDD in ShuffleDependency contains the RDD object
> >> >> >> >> It seems Stage.isIndeterminate() is unreadable from
> >> >> ShuffleDependency,
> >> >> >> >> but luckily rdd is used internally
> >> >> >> >>
> >> >> >> >> def isIndeterminate: Boolean = {
> >> >> >> >>   rdd.outputDeterministicLevel ==
> >> DeterministicLevel.INDETERMINATE
> >> >> >> >> }
> >> >> >> >>
> >> >> >> >> Relies on internal implementation is not good, but doable.
> >> >> >> >> I don't expect spark RDD/Stage implementation changes
> frequently,
> >> >> and
> >> >> >> we
> >> >> >> >> can discuss with Spark community for a RDD isIndeterminate API
> if
> >> >> they
> >> >> >> >> change it in the future
> >> >> >> >>
> >> >> >> >
> >> >> >> >
> >> >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed,
> >> >> >> > RDD.outputDeterministicLevel is not and it is private[spark].
> >> >> >> > While I dont expect changes to this, it is inherently unstable
> to
> >> >> depend
> >> >> >> > on it.
> >> >> >> >
> >> >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
> >> >> >> > maintaining a reducer oriented view, you will need to recompute
> >> all
> >> >> the
> >> >> >> > mappers anyway - what you might save is the subset of reducer
> >> >> partitions
> >> >> >> > which can be skipped if it is DETERMINATE.
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >>
> >> >> >> >> for c)
> >> >> >> >> I also considered a similar solution in celeborn
> >> >> >> >> Celeborn (LifecycleManager) can get the full picture of
> remaining
> >> >> >> shuffle
> >> >> >> >> data from previous stage attempt and reuse it in stage
> recompute
> >> >> >> >> , and the whole process will be transparent to
> Spark/DagScheduler
> >> >> >> >>
> >> >> >> >
> >> >> >> > Celeborn does not have visibility into this - and this is
> >> potentially
> >> >> >> > subject to invasive changes in Apache Spark as it evolves.
> >> >> >> > For example, I recently merged a couple of changes which would
> >> make
> >> >> this
> >> >> >> > different in master compared to previous versions.
> >> >> >> > Until the remote shuffle service SPIP is implemented and these
> are
> >> >> >> > abstracted out & made pluggable, it will continue to be quite
> >> >> volatile.
> >> >> >> >
> >> >> >> > Note that the behavior for 3.5 and older is known - since Spark
> >> >> versions
> >> >> >> > have been released - it is the behavior in master and future
> >> >> versions of
> >> >> >> > Spark which is subject to change.
> >> >> >> > So delivering on SPARK-25299 would future proof all remote
> shuffle
> >> >> >> > implementations.
> >> >> >> >
> >> >> >> >
> >> >> >> > Regards,
> >> >> >> > Mridul
> >> >> >> >
> >> >> >> >
> >> >> >> >
> >> >> >> >>
> >> >> >> >> Per my perspective, leveraging partial stage recompute and
> >> >> >> >> remaining shuffle data needs a lot of work to do in Celeborn
> >> >> >> >> I prefer to implement a simple whole stage recompute first with
> >> >> >> interface
> >> >> >> >> defined with recomputeAll = true flag, and explore partial
> stage
> >> >> >> recompute
> >> >> >> >> in seperate ticket as future optimization
> >> >> >> >> How do you think about it?
> >> >> >> >>
> >> >> >> >> Regards,
> >> >> >> >> Erik
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <
> >> >> mri...@gmail.com>
> >> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <
> >> >> mri...@gmail.com
> >> >> >> >
> >> >> >> >>> wrote:
> >> >> >> >>>
> >> >> >> >>>>
> >> >> >> >>>> A reducer oriented view of shuffle, especially without
> >> >> replication,
> >> >> >> >>>> could indeed be susceptible to this issue you described (a
> >> single
> >> >> >> fetch
> >> >> >> >>>> failure would require all mappers to need to be recomputed) -
> >> >> note,
> >> >> >> not
> >> >> >> >>>> necessarily all reducers to be recomputed though.
> >> >> >> >>>>
> >> >> >> >>>> Note that I have not looked much into Celeborn specifically
> on
> >> >> this
> >> >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark
> >> internals
> >> >> >> :-)
> >> >> >> >>>>
> >> >> >> >>>> Regards,
> >> >> >> >>>> Mridul
> >> >> >> >>>>
> >> >> >> >>>>
> >> >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <
> >> glap...@gmail.com>
> >> >> >> wrote:
> >> >> >> >>>>
> >> >> >> >>>>> Hello,
> >> >> >> >>>>>
> >> >> >> >>>>> (Sorry for sending the same message again.)
> >> >> >> >>>>>
> >> >> >> >>>>> From my understanding, the current implementation of
> Celeborn
> >> >> makes
> >> >> >> it
> >> >> >> >>>>> hard to find out which mapper should be re-executed when a
> >> >> >> partition cannot
> >> >> >> >>>>> be read, and we should re-execute all the mappers in the
> >> upstream
> >> >> >> stage. If
> >> >> >> >>>>> we can find out which mapper/partition should be
> re-executed,
> >> the
> >> >> >> current
> >> >> >> >>>>> logic of stage recomputation could be (partially or totally)
> >> >> reused.
> >> >> >> >>>>>
> >> >> >> >>>>> Regards,
> >> >> >> >>>>>
> >> >> >> >>>>> --- Sungwoo
> >> >> >> >>>>>
> >> >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
> >> >> >> mri...@gmail.com>
> >> >> >> >>>>> wrote:
> >> >> >> >>>>>
> >> >> >> >>>>>>
> >> >> >> >>>>>> Hi,
> >> >> >> >>>>>>
> >> >> >> >>>>>>   Spark will try to minimize the recomputation cost as much
> >> as
> >> >> >> >>>>>> possible.
> >> >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply
> >> needs to
> >> >> >> >>>>>> recompute the missing (mapper) partitions (which resulted
> in
> >> >> fetch
> >> >> >> >>>>>> failure). Note, this by itself could require further
> >> >> recomputation
> >> >> >> in the
> >> >> >> >>>>>> DAG if the inputs required to comput the parent partitions
> >> are
> >> >> >> missing, and
> >> >> >> >>>>>> so on - so it is dynamic.
> >> >> >> >>>>>>
> >> >> >> >>>>>> Regards,
> >> >> >> >>>>>> Mridul
> >> >> >> >>>>>>
> >> >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
> >> >> >> o...@pl.postech.ac.kr>
> >> >> >> >>>>>> wrote:
> >> >> >> >>>>>>
> >> >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle
> >> id)
> >> >> is
> >> >> >> >>>>>>> going to be
> >> >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle
> >> >> output
> >> >> >> >>>>>>> will be
> >> >> >> >>>>>>> > discarded and it will be entirely recomputed (see here
> >> >> >> >>>>>>> > <
> >> >> >> >>>>>>>
> >> >> >>
> >> >>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >> >> >> >>>>>>> >
> >> >> >> >>>>>>> > ).
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data,
> >> can we
> >> >> >> find
> >> >> >> >>>>>>> out
> >> >> >> >>>>>>> which tasks should recompute their output? From the
> previous
> >> >> >> >>>>>>> discussion, I
> >> >> >> >>>>>>> thought this was hard (in the current implementation), and
> >> we
> >> >> >> should
> >> >> >> >>>>>>> re-execute all tasks in the upstream stage.
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> Thanks,
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> --- Sungwoo
> >> >> >> >>>>>>>
> >> >> >> >>>>>>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >>
> >
>

Reply via email to