Hi Mridul, Your explanation is clear and great

Thank you so much!

On Fri, Oct 20, 2023 at 11:59 AM Keyong Zhou <zho...@apache.org> wrote:

> Hi Mridul, thanks for the explanation, it's clear to me now, Thanks!
>
> Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道:
>
> > To add my response - what I described (w.r.t failing job) applies only to
> > ResultStage.
> > It walks the lineage DAG to identify all indeterminate parents to
> rollback.
> > If there are only ShuffleMapStages in the set of stages to rollback, it
> > will simply discard their output, rollback all of them, and then retry
> > these stages (same shuffle-id, a new stage attempt)
> >
> >
> > Regards,
> > Mridul
> >
> >
> >
> > On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com>
> > wrote:
> >
> > >
> > > Good question, and ResultStage is actually special cased in spark as
> its
> > > output could have already been consumed (for example collect() to
> driver,
> > > etc) - and so if it is one of the stages which needs to be rolled back,
> > the
> > > job is aborted.
> > >
> > > To illustrate, see the following:
> > > -- snip --
> > >
> > > package org.apache.spark
> > >
> > >
> > > import scala.reflect.ClassTag
> > >
> > > import org.apache.spark._
> > > import org.apache.spark.rdd.{DeterministicLevel, RDD}
> > >
> > > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends
> > RDD[E](delegate) {
> > >
> > >   override def compute(split: Partition, context: TaskContext):
> > Iterator[E] = {
> > >     delegate.compute(split, context)
> > >   }
> > >
> > >   override protected def getPartitions: Array[Partition] =
> > >     delegate.partitions
> > > }
> > >
> > > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends
> > DelegatingRDD[E](delegate) {
> > >   override def getOutputDeterministicLevel: DeterministicLevel.Value =
> > DeterministicLevel.INDETERMINATE
> > > }
> > >
> > > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends
> > DelegatingRDD[E](delegate) {
> > >   override def compute(split: Partition, context: TaskContext):
> > Iterator[E] = {
> > >     val tc = TaskContext.get
> > >     if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 &&
> > tc.attemptNumber() == 0) {
> > >       // Wait for all tasks to be done, then call exit
> > >       Thread.sleep(5000)
> > >       System.exit(-1)
> > >     }
> > >     delegate.compute(split, context)
> > >   }
> > > }
> > >
> > > // Make sure test_output directory is deleted before running this.
> > > //
> > > object Test {
> > >
> > >   def main(args: Array[String]): Unit = {
> > >     val conf = new SparkConf().setMaster("local-cluster[4,1,1024]")
> > >     val sc = new SparkContext(conf)
> > >
> > >     val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000,
> > 20).map(v => (v, v)))
> > >     val resultRdd = new FailingRDD(mapperRdd.groupByKey())
> > >     resultRdd.saveAsTextFile("test_output")
> > >   }
> > > }
> > >
> > > -- snip --
> > >
> > >
> > >
> > > Here, the mapper stage has been forced to be INDETERMINATE.
> > > In the reducer stage, the first attempt to compute partition 0 will
> wait
> > for a bit and then exit - since the master is a local-cluster, this
> results
> > in FetchFailure when the second attempt of partition 0 tries to fetch
> > shuffle data.
> > > When spark tries to regenerate parent shuffle output, it sees that the
> > parent is INDETERMINATE - and so fails the entire job.with the message:
> > > "
> > > org.apache.spark.SparkException: Job aborted due to stage failure: A
> > shuffle map stage with indeterminate output was failed and retried.
> > However, Spark cannot rollback the ResultStage 1 to re-process the input
> > data, and has to fail this job. Please eliminate the indeterminacy by
> > checkpointing the RDD before repartition and try again.
> > > "
> > >
> > > This is coming from here <
> >
> https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090
> >
> > - when rolling back stages, if spark determines that a ResultStage needs
> to
> > be rolled back due to loss of INDETERMINATE output, it will fail the job.
> > >
> > > Hope this clarifies.
> > > Regards,
> > > Mridul
> > >
> > >
> > > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org>
> wrote:
> > >
> > >> In fact, I'm wondering if Spark will rerun the whole reduce
> > >> ShuffleMapStage
> > >> if its upstream ShuffleMapStage is INDETERMINATE and rerun.
> > >>
> > >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:
> > >>
> > >> > Thanks Erik for bringing up this question, I'm also curious about
> the
> > >> > answer, any feedback is appreciated.
> > >> >
> > >> > Thanks,
> > >> > Keyong Zhou
> > >> >
> > >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
> > >> >
> > >> >> Mridul,
> > >> >>
> > >> >> sure, I totally agree SPARK-25299 is a much better solution, as
> long
> > >> as we
> > >> >> can get it from spark community
> > >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big
> deal,
> > >> >> celeborn already has spark-integration code with  [spark] scope)
> > >> >>
> > >> >> I also have a question about INDETERMINATE stage recompute, and may
> > >> need
> > >> >> your help
> > >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable,
> > >> however, I
> > >> >> don't find related logic for INDETERMINATE ResultStage rerun in
> > >> >> DAGScheduler
> > >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
> > >> >> corresponding ResultStage should be entirely recomputed as well,
> per
> > my
> > >> >> understanding
> > >> >>
> > >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to
> > rollback
> > >> a
> > >> >> ResultStage but it was not merged
> > >> >> Do you know any context or related ticket for INDETERMINATE
> > ResultStage
> > >> >> rerun?
> > >> >>
> > >> >> Thanks in advance!
> > >> >>
> > >> >> Regards,
> > >> >> Erik
> > >> >>
> > >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <
> > mri...@gmail.com>
> > >> >> wrote:
> > >> >>
> > >> >> >
> > >> >> >
> > >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com>
> > wrote:
> > >> >> >
> > >> >> >> Hi Mridul,
> > >> >> >>
> > >> >> >> For a),
> > >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> > >> >> >> <
> > >> >>
> > >>
> >
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
> > >> >> >
> > >> >> >> to decide whether the whole stage needs to be recomputed
> > >> >> >> I think we can pass the same information to Celeborn in
> > >> >> >> ShuffleManager.registerShuffle()
> > >> >> >> <
> > >> >>
> > >>
> >
> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39
> > >> >,
> > >> >> since
> > >> >> >> RDD in ShuffleDependency contains the RDD object
> > >> >> >> It seems Stage.isIndeterminate() is unreadable from
> > >> ShuffleDependency,
> > >> >> >> but luckily rdd is used internally
> > >> >> >>
> > >> >> >> def isIndeterminate: Boolean = {
> > >> >> >>   rdd.outputDeterministicLevel ==
> DeterministicLevel.INDETERMINATE
> > >> >> >> }
> > >> >> >>
> > >> >> >> Relies on internal implementation is not good, but doable.
> > >> >> >> I don't expect spark RDD/Stage implementation changes
> frequently,
> > >> and
> > >> >> we
> > >> >> >> can discuss with Spark community for a RDD isIndeterminate API
> if
> > >> they
> > >> >> >> change it in the future
> > >> >> >>
> > >> >> >
> > >> >> >
> > >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed,
> > >> >> > RDD.outputDeterministicLevel is not and it is private[spark].
> > >> >> > While I dont expect changes to this, it is inherently unstable to
> > >> depend
> > >> >> > on it.
> > >> >> >
> > >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
> > >> >> > maintaining a reducer oriented view, you will need to recompute
> all
> > >> the
> > >> >> > mappers anyway - what you might save is the subset of reducer
> > >> partitions
> > >> >> > which can be skipped if it is DETERMINATE.
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >>
> > >> >> >> for c)
> > >> >> >> I also considered a similar solution in celeborn
> > >> >> >> Celeborn (LifecycleManager) can get the full picture of
> remaining
> > >> >> shuffle
> > >> >> >> data from previous stage attempt and reuse it in stage recompute
> > >> >> >> , and the whole process will be transparent to
> Spark/DagScheduler
> > >> >> >>
> > >> >> >
> > >> >> > Celeborn does not have visibility into this - and this is
> > potentially
> > >> >> > subject to invasive changes in Apache Spark as it evolves.
> > >> >> > For example, I recently merged a couple of changes which would
> make
> > >> this
> > >> >> > different in master compared to previous versions.
> > >> >> > Until the remote shuffle service SPIP is implemented and these
> are
> > >> >> > abstracted out & made pluggable, it will continue to be quite
> > >> volatile.
> > >> >> >
> > >> >> > Note that the behavior for 3.5 and older is known - since Spark
> > >> versions
> > >> >> > have been released - it is the behavior in master and future
> > >> versions of
> > >> >> > Spark which is subject to change.
> > >> >> > So delivering on SPARK-25299 would future proof all remote
> shuffle
> > >> >> > implementations.
> > >> >> >
> > >> >> >
> > >> >> > Regards,
> > >> >> > Mridul
> > >> >> >
> > >> >> >
> > >> >> >
> > >> >> >>
> > >> >> >> Per my perspective, leveraging partial stage recompute and
> > >> >> >> remaining shuffle data needs a lot of work to do in Celeborn
> > >> >> >> I prefer to implement a simple whole stage recompute first with
> > >> >> interface
> > >> >> >> defined with recomputeAll = true flag, and explore partial stage
> > >> >> recompute
> > >> >> >> in seperate ticket as future optimization
> > >> >> >> How do you think about it?
> > >> >> >>
> > >> >> >> Regards,
> > >> >> >> Erik
> > >> >> >>
> > >> >> >>
> > >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <
> > >> mri...@gmail.com>
> > >> >> >> wrote:
> > >> >> >>
> > >> >> >>>
> > >> >> >>>
> > >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <
> > >> mri...@gmail.com
> > >> >> >
> > >> >> >>> wrote:
> > >> >> >>>
> > >> >> >>>>
> > >> >> >>>> A reducer oriented view of shuffle, especially without
> > >> replication,
> > >> >> >>>> could indeed be susceptible to this issue you described (a
> > single
> > >> >> fetch
> > >> >> >>>> failure would require all mappers to need to be recomputed) -
> > >> note,
> > >> >> not
> > >> >> >>>> necessarily all reducers to be recomputed though.
> > >> >> >>>>
> > >> >> >>>> Note that I have not looked much into Celeborn specifically on
> > >> this
> > >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark
> > internals
> > >> >> :-)
> > >> >> >>>>
> > >> >> >>>> Regards,
> > >> >> >>>> Mridul
> > >> >> >>>>
> > >> >> >>>>
> > >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <
> glap...@gmail.com
> > >
> > >> >> wrote:
> > >> >> >>>>
> > >> >> >>>>> Hello,
> > >> >> >>>>>
> > >> >> >>>>> (Sorry for sending the same message again.)
> > >> >> >>>>>
> > >> >> >>>>> From my understanding, the current implementation of Celeborn
> > >> makes
> > >> >> it
> > >> >> >>>>> hard to find out which mapper should be re-executed when a
> > >> >> partition cannot
> > >> >> >>>>> be read, and we should re-execute all the mappers in the
> > upstream
> > >> >> stage. If
> > >> >> >>>>> we can find out which mapper/partition should be re-executed,
> > the
> > >> >> current
> > >> >> >>>>> logic of stage recomputation could be (partially or totally)
> > >> reused.
> > >> >> >>>>>
> > >> >> >>>>> Regards,
> > >> >> >>>>>
> > >> >> >>>>> --- Sungwoo
> > >> >> >>>>>
> > >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
> > >> >> mri...@gmail.com>
> > >> >> >>>>> wrote:
> > >> >> >>>>>
> > >> >> >>>>>>
> > >> >> >>>>>> Hi,
> > >> >> >>>>>>
> > >> >> >>>>>>   Spark will try to minimize the recomputation cost as much
> as
> > >> >> >>>>>> possible.
> > >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply
> needs
> > to
> > >> >> >>>>>> recompute the missing (mapper) partitions (which resulted in
> > >> fetch
> > >> >> >>>>>> failure). Note, this by itself could require further
> > >> recomputation
> > >> >> in the
> > >> >> >>>>>> DAG if the inputs required to comput the parent partitions
> are
> > >> >> missing, and
> > >> >> >>>>>> so on - so it is dynamic.
> > >> >> >>>>>>
> > >> >> >>>>>> Regards,
> > >> >> >>>>>> Mridul
> > >> >> >>>>>>
> > >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
> > >> >> o...@pl.postech.ac.kr>
> > >> >> >>>>>> wrote:
> > >> >> >>>>>>
> > >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle
> id)
> > >> is
> > >> >> >>>>>>> going to be
> > >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle
> > >> output
> > >> >> >>>>>>> will be
> > >> >> >>>>>>> > discarded and it will be entirely recomputed (see here
> > >> >> >>>>>>> > <
> > >> >> >>>>>>>
> > >> >>
> > >>
> >
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> > >> >> >>>>>>> >
> > >> >> >>>>>>> > ).
> > >> >> >>>>>>>
> > >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data,
> can
> > we
> > >> >> find
> > >> >> >>>>>>> out
> > >> >> >>>>>>> which tasks should recompute their output? From the
> previous
> > >> >> >>>>>>> discussion, I
> > >> >> >>>>>>> thought this was hard (in the current implementation), and
> we
> > >> >> should
> > >> >> >>>>>>> re-execute all tasks in the upstream stage.
> > >> >> >>>>>>>
> > >> >> >>>>>>> Thanks,
> > >> >> >>>>>>>
> > >> >> >>>>>>> --- Sungwoo
> > >> >> >>>>>>>
> > >> >> >>>>>>
> > >> >>
> > >> >
> > >>
> > >
> >
>

Reply via email to