Hi Mridul, thanks for the explanation, it's clear to me now, Thanks!

Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道:

> To add my response - what I described (w.r.t failing job) applies only to
> ResultStage.
> It walks the lineage DAG to identify all indeterminate parents to rollback.
> If there are only ShuffleMapStages in the set of stages to rollback, it
> will simply discard their output, rollback all of them, and then retry
> these stages (same shuffle-id, a new stage attempt)
>
>
> Regards,
> Mridul
>
>
>
> On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
> >
> > Good question, and ResultStage is actually special cased in spark as its
> > output could have already been consumed (for example collect() to driver,
> > etc) - and so if it is one of the stages which needs to be rolled back,
> the
> > job is aborted.
> >
> > To illustrate, see the following:
> > -- snip --
> >
> > package org.apache.spark
> >
> >
> > import scala.reflect.ClassTag
> >
> > import org.apache.spark._
> > import org.apache.spark.rdd.{DeterministicLevel, RDD}
> >
> > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends
> RDD[E](delegate) {
> >
> >   override def compute(split: Partition, context: TaskContext):
> Iterator[E] = {
> >     delegate.compute(split, context)
> >   }
> >
> >   override protected def getPartitions: Array[Partition] =
> >     delegate.partitions
> > }
> >
> > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
> DelegatingRDD[E](delegate) {
> >   override def getOutputDeterministicLevel: DeterministicLevel.Value =
> DeterministicLevel.INDETERMINATE
> > }
> >
> > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
> DelegatingRDD[E](delegate) {
> >   override def compute(split: Partition, context: TaskContext):
> Iterator[E] = {
> >     val tc = TaskContext.get
> >     if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
> tc.attemptNumber() == 0) {
> >       // Wait for all tasks to be done, then call exit
> >       Thread.sleep(5000)
> >       System.exit(-1)
> >     }
> >     delegate.compute(split, context)
> >   }
> > }
> >
> > // Make sure test_output directory is deleted before running this.
> > //
> > object Test {
> >
> >   def main(args: Array[String]): Unit = {
> >     val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
> >     val sc = new SparkContext(conf)
> >
> >     val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000,
> 20).map(v => (v, v)))
> >     val resultRdd = new FailingRDD(mapperRdd.groupByKey())
> >     resultRdd.saveAsTextFile("test_output")
> >   }
> > }
> >
> > -- snip --
> >
> >
> >
> > Here, the mapper stage has been forced to be INDETERMINATE.
> > In the reducer stage, the first attempt to compute partition 0 will wait
> for a bit and then exit - since the master is a local-cluster, this results
> in FetchFailure when the second attempt of partition 0 tries to fetch
> shuffle data.
> > When spark tries to regenerate parent shuffle output, it sees that the
> parent is INDETERMINATE - and so fails the entire job.with the message:
> > "
> > org.apache.spark.SparkException: Job aborted due to stage failure: A
> shuffle map stage with indeterminate output was failed and retried.
> However, Spark cannot rollback the ResultStage 1 to re-process the input
> data, and has to fail this job. Please eliminate the indeterminacy by
> checkpointing the RDD before repartition and try again.
> > "
> >
> > This is coming from here <
> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090>
> - when rolling back stages, if spark determines that a ResultStage needs to
> be rolled back due to loss of INDETERMINATE output, it will fail the job.
> >
> > Hope this clarifies.
> > Regards,
> > Mridul
> >
> >
> > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> wrote:
> >
> >> In fact, I'm wondering if Spark will rerun the whole reduce
> >> ShuffleMapStage
> >> if its upstream ShuffleMapStage is INDETERMINATE and rerun.
> >>
> >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:
> >>
> >> > Thanks Erik for bringing up this question, I'm also curious about the
> >> > answer, any feedback is appreciated.
> >> >
> >> > Thanks,
> >> > Keyong Zhou
> >> >
> >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
> >> >
> >> >> Mridul,
> >> >>
> >> >> sure, I totally agree SPARK-25299 is a much better solution, as long
> >> as we
> >> >> can get it from spark community
> >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
> >> >> celeborn already has spark-integration code with  [spark] scope)
> >> >>
> >> >> I also have a question about INDETERMINATE stage recompute, and may
> >> need
> >> >> your help
> >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable,
> >> however, I
> >> >> don't find related logic for INDETERMINATE ResultStage rerun in
> >> >> DAGScheduler
> >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
> >> >> corresponding ResultStage should be entirely recomputed as well, per
> my
> >> >> understanding
> >> >>
> >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to
> rollback
> >> a
> >> >> ResultStage but it was not merged
> >> >> Do you know any context or related ticket for INDETERMINATE
> ResultStage
> >> >> rerun?
> >> >>
> >> >> Thanks in advance!
> >> >>
> >> >> Regards,
> >> >> Erik
> >> >>
> >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <
> mri...@gmail.com>
> >> >> wrote:
> >> >>
> >> >> >
> >> >> >
> >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com>
> wrote:
> >> >> >
> >> >> >> Hi Mridul,
> >> >> >>
> >> >> >> For a),
> >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> >> >> >> <
> >> >>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
> >> >> >
> >> >> >> to decide whether the whole stage needs to be recomputed
> >> >> >> I think we can pass the same information to Celeborn in
> >> >> >> ShuffleManager.registerShuffle()
> >> >> >> <
> >> >>
> >>
> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39
> >> >,
> >> >> since
> >> >> >> RDD in ShuffleDependency contains the RDD object
> >> >> >> It seems Stage.isIndeterminate() is unreadable from
> >> ShuffleDependency,
> >> >> >> but luckily rdd is used internally
> >> >> >>
> >> >> >> def isIndeterminate: Boolean = {
> >> >> >>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
> >> >> >> }
> >> >> >>
> >> >> >> Relies on internal implementation is not good, but doable.
> >> >> >> I don't expect spark RDD/Stage implementation changes frequently,
> >> and
> >> >> we
> >> >> >> can discuss with Spark community for a RDD isIndeterminate API if
> >> they
> >> >> >> change it in the future
> >> >> >>
> >> >> >
> >> >> >
> >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed,
> >> >> > RDD.outputDeterministicLevel is not and it is private[spark].
> >> >> > While I dont expect changes to this, it is inherently unstable to
> >> depend
> >> >> > on it.
> >> >> >
> >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
> >> >> > maintaining a reducer oriented view, you will need to recompute all
> >> the
> >> >> > mappers anyway - what you might save is the subset of reducer
> >> partitions
> >> >> > which can be skipped if it is DETERMINATE.
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >>
> >> >> >> for c)
> >> >> >> I also considered a similar solution in celeborn
> >> >> >> Celeborn (LifecycleManager) can get the full picture of remaining
> >> >> shuffle
> >> >> >> data from previous stage attempt and reuse it in stage recompute
> >> >> >> , and the whole process will be transparent to Spark/DagScheduler
> >> >> >>
> >> >> >
> >> >> > Celeborn does not have visibility into this - and this is
> potentially
> >> >> > subject to invasive changes in Apache Spark as it evolves.
> >> >> > For example, I recently merged a couple of changes which would make
> >> this
> >> >> > different in master compared to previous versions.
> >> >> > Until the remote shuffle service SPIP is implemented and these are
> >> >> > abstracted out & made pluggable, it will continue to be quite
> >> volatile.
> >> >> >
> >> >> > Note that the behavior for 3.5 and older is known - since Spark
> >> versions
> >> >> > have been released - it is the behavior in master and future
> >> versions of
> >> >> > Spark which is subject to change.
> >> >> > So delivering on SPARK-25299 would future proof all remote shuffle
> >> >> > implementations.
> >> >> >
> >> >> >
> >> >> > Regards,
> >> >> > Mridul
> >> >> >
> >> >> >
> >> >> >
> >> >> >>
> >> >> >> Per my perspective, leveraging partial stage recompute and
> >> >> >> remaining shuffle data needs a lot of work to do in Celeborn
> >> >> >> I prefer to implement a simple whole stage recompute first with
> >> >> interface
> >> >> >> defined with recomputeAll = true flag, and explore partial stage
> >> >> recompute
> >> >> >> in seperate ticket as future optimization
> >> >> >> How do you think about it?
> >> >> >>
> >> >> >> Regards,
> >> >> >> Erik
> >> >> >>
> >> >> >>
> >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <
> >> mri...@gmail.com>
> >> >> >> wrote:
> >> >> >>
> >> >> >>>
> >> >> >>>
> >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <
> >> mri...@gmail.com
> >> >> >
> >> >> >>> wrote:
> >> >> >>>
> >> >> >>>>
> >> >> >>>> A reducer oriented view of shuffle, especially without
> >> replication,
> >> >> >>>> could indeed be susceptible to this issue you described (a
> single
> >> >> fetch
> >> >> >>>> failure would require all mappers to need to be recomputed) -
> >> note,
> >> >> not
> >> >> >>>> necessarily all reducers to be recomputed though.
> >> >> >>>>
> >> >> >>>> Note that I have not looked much into Celeborn specifically on
> >> this
> >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark
> internals
> >> >> :-)
> >> >> >>>>
> >> >> >>>> Regards,
> >> >> >>>> Mridul
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com
> >
> >> >> wrote:
> >> >> >>>>
> >> >> >>>>> Hello,
> >> >> >>>>>
> >> >> >>>>> (Sorry for sending the same message again.)
> >> >> >>>>>
> >> >> >>>>> From my understanding, the current implementation of Celeborn
> >> makes
> >> >> it
> >> >> >>>>> hard to find out which mapper should be re-executed when a
> >> >> partition cannot
> >> >> >>>>> be read, and we should re-execute all the mappers in the
> upstream
> >> >> stage. If
> >> >> >>>>> we can find out which mapper/partition should be re-executed,
> the
> >> >> current
> >> >> >>>>> logic of stage recomputation could be (partially or totally)
> >> reused.
> >> >> >>>>>
> >> >> >>>>> Regards,
> >> >> >>>>>
> >> >> >>>>> --- Sungwoo
> >> >> >>>>>
> >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
> >> >> mri...@gmail.com>
> >> >> >>>>> wrote:
> >> >> >>>>>
> >> >> >>>>>>
> >> >> >>>>>> Hi,
> >> >> >>>>>>
> >> >> >>>>>>   Spark will try to minimize the recomputation cost as much as
> >> >> >>>>>> possible.
> >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply needs
> to
> >> >> >>>>>> recompute the missing (mapper) partitions (which resulted in
> >> fetch
> >> >> >>>>>> failure). Note, this by itself could require further
> >> recomputation
> >> >> in the
> >> >> >>>>>> DAG if the inputs required to comput the parent partitions are
> >> >> missing, and
> >> >> >>>>>> so on - so it is dynamic.
> >> >> >>>>>>
> >> >> >>>>>> Regards,
> >> >> >>>>>> Mridul
> >> >> >>>>>>
> >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
> >> >> o...@pl.postech.ac.kr>
> >> >> >>>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id)
> >> is
> >> >> >>>>>>> going to be
> >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle
> >> output
> >> >> >>>>>>> will be
> >> >> >>>>>>> > discarded and it will be entirely recomputed (see here
> >> >> >>>>>>> > <
> >> >> >>>>>>>
> >> >>
> >>
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >> >> >>>>>>> >
> >> >> >>>>>>> > ).
> >> >> >>>>>>>
> >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data, can
> we
> >> >> find
> >> >> >>>>>>> out
> >> >> >>>>>>> which tasks should recompute their output? From the previous
> >> >> >>>>>>> discussion, I
> >> >> >>>>>>> thought this was hard (in the current implementation), and we
> >> >> should
> >> >> >>>>>>> re-execute all tasks in the upstream stage.
> >> >> >>>>>>>
> >> >> >>>>>>> Thanks,
> >> >> >>>>>>>
> >> >> >>>>>>> --- Sungwoo
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >>
> >> >
> >>
> >
>

Reply via email to