Hi Mridul, Your explanation is clear and great Thank you so much!
On Fri, Oct 20, 2023 at 11:59 AM Keyong Zhou <zho...@apache.org> wrote: > Hi Mridul, thanks for the explanation, it's clear to me now, Thanks! > > Mridul Muralidharan <mri...@gmail.com> 于2023年10月20日周五 11:15写道: > > > To add my response - what I described (w.r.t failing job) applies only to > > ResultStage. > > It walks the lineage DAG to identify all indeterminate parents to > rollback. > > If there are only ShuffleMapStages in the set of stages to rollback, it > > will simply discard their output, rollback all of them, and then retry > > these stages (same shuffle-id, a new stage attempt) > > > > > > Regards, > > Mridul > > > > > > > > On Thu, Oct 19, 2023 at 10:08 PM Mridul Muralidharan <mri...@gmail.com> > > wrote: > > > > > > > > Good question, and ResultStage is actually special cased in spark as > its > > > output could have already been consumed (for example collect() to > driver, > > > etc) - and so if it is one of the stages which needs to be rolled back, > > the > > > job is aborted. > > > > > > To illustrate, see the following: > > > -- snip -- > > > > > > package org.apache.spark > > > > > > > > > import scala.reflect.ClassTag > > > > > > import org.apache.spark._ > > > import org.apache.spark.rdd.{DeterministicLevel, RDD} > > > > > > class DelegatingRDD[E: ClassTag](delegate: RDD[E]) extends > > RDD[E](delegate) { > > > > > > override def compute(split: Partition, context: TaskContext): > > Iterator[E] = { > > > delegate.compute(split, context) > > > } > > > > > > override protected def getPartitions: Array[Partition] = > > > delegate.partitions > > > } > > > > > > class IndeterminateRDD[E: ClassTag](delegate: RDD[E]) extends > > DelegatingRDD[E](delegate) { > > > override def getOutputDeterministicLevel: DeterministicLevel.Value = > > DeterministicLevel.INDETERMINATE > > > } > > > > > > class FailingRDD[E: ClassTag](delegate: RDD[E]) extends > > DelegatingRDD[E](delegate) { > > > override def compute(split: Partition, context: TaskContext): > > Iterator[E] = { > > > val tc = TaskContext.get > > > if (tc.stageAttemptNumber() == 0 && tc.partitionId() == 0 && > > tc.attemptNumber() == 0) { > > > // Wait for all tasks to be done, then call exit > > > Thread.sleep(5000) > > > System.exit(-1) > > > } > > > delegate.compute(split, context) > > > } > > > } > > > > > > // Make sure test_output directory is deleted before running this. > > > // > > > object Test { > > > > > > def main(args: Array[String]): Unit = { > > > val conf = new SparkConf().setMaster("local-cluster[4,1,1024]") > > > val sc = new SparkContext(conf) > > > > > > val mapperRdd = new IndeterminateRDD(sc.parallelize(0 until 10000, > > 20).map(v => (v, v))) > > > val resultRdd = new FailingRDD(mapperRdd.groupByKey()) > > > resultRdd.saveAsTextFile("test_output") > > > } > > > } > > > > > > -- snip -- > > > > > > > > > > > > Here, the mapper stage has been forced to be INDETERMINATE. > > > In the reducer stage, the first attempt to compute partition 0 will > wait > > for a bit and then exit - since the master is a local-cluster, this > results > > in FetchFailure when the second attempt of partition 0 tries to fetch > > shuffle data. > > > When spark tries to regenerate parent shuffle output, it sees that the > > parent is INDETERMINATE - and so fails the entire job.with the message: > > > " > > > org.apache.spark.SparkException: Job aborted due to stage failure: A > > shuffle map stage with indeterminate output was failed and retried. > > However, Spark cannot rollback the ResultStage 1 to re-process the input > > data, and has to fail this job. Please eliminate the indeterminacy by > > checkpointing the RDD before repartition and try again. > > > " > > > > > > This is coming from here < > > > https://github.com/apache/spark/blob/28292d51e7dbe2f3488e82435abb48d3d31f6044/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2090 > > > > - when rolling back stages, if spark determines that a ResultStage needs > to > > be rolled back due to loss of INDETERMINATE output, it will fail the job. > > > > > > Hope this clarifies. > > > Regards, > > > Mridul > > > > > > > > > On Thu, Oct 19, 2023 at 10:04 AM Keyong Zhou <zho...@apache.org> > wrote: > > > > > >> In fact, I'm wondering if Spark will rerun the whole reduce > > >> ShuffleMapStage > > >> if its upstream ShuffleMapStage is INDETERMINATE and rerun. > > >> > > >> Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道: > > >> > > >> > Thanks Erik for bringing up this question, I'm also curious about > the > > >> > answer, any feedback is appreciated. > > >> > > > >> > Thanks, > > >> > Keyong Zhou > > >> > > > >> > Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道: > > >> > > > >> >> Mridul, > > >> >> > > >> >> sure, I totally agree SPARK-25299 is a much better solution, as > long > > >> as we > > >> >> can get it from spark community > > >> >> (btw, private[spark] of RDD.outputDeterministicLevel is no big > deal, > > >> >> celeborn already has spark-integration code with [spark] scope) > > >> >> > > >> >> I also have a question about INDETERMINATE stage recompute, and may > > >> need > > >> >> your help > > >> >> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, > > >> however, I > > >> >> don't find related logic for INDETERMINATE ResultStage rerun in > > >> >> DAGScheduler > > >> >> If INDETERMINATE ShuffleMapStage got entirely recomputed, the > > >> >> corresponding ResultStage should be entirely recomputed as well, > per > > my > > >> >> understanding > > >> >> > > >> >> I found https://issues.apache.org/jira/browse/SPARK-25342 to > > rollback > > >> a > > >> >> ResultStage but it was not merged > > >> >> Do you know any context or related ticket for INDETERMINATE > > ResultStage > > >> >> rerun? > > >> >> > > >> >> Thanks in advance! > > >> >> > > >> >> Regards, > > >> >> Erik > > >> >> > > >> >> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan < > > mri...@gmail.com> > > >> >> wrote: > > >> >> > > >> >> > > > >> >> > > > >> >> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> > > wrote: > > >> >> > > > >> >> >> Hi Mridul, > > >> >> >> > > >> >> >> For a), > > >> >> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier() > > >> >> >> < > > >> >> > > >> > > > https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975 > > >> >> > > > >> >> >> to decide whether the whole stage needs to be recomputed > > >> >> >> I think we can pass the same information to Celeborn in > > >> >> >> ShuffleManager.registerShuffle() > > >> >> >> < > > >> >> > > >> > > > https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39 > > >> >, > > >> >> since > > >> >> >> RDD in ShuffleDependency contains the RDD object > > >> >> >> It seems Stage.isIndeterminate() is unreadable from > > >> ShuffleDependency, > > >> >> >> but luckily rdd is used internally > > >> >> >> > > >> >> >> def isIndeterminate: Boolean = { > > >> >> >> rdd.outputDeterministicLevel == > DeterministicLevel.INDETERMINATE > > >> >> >> } > > >> >> >> > > >> >> >> Relies on internal implementation is not good, but doable. > > >> >> >> I don't expect spark RDD/Stage implementation changes > frequently, > > >> and > > >> >> we > > >> >> >> can discuss with Spark community for a RDD isIndeterminate API > if > > >> they > > >> >> >> change it in the future > > >> >> >> > > >> >> > > > >> >> > > > >> >> > Only RDD.getOutputDeterministicLevel is publicly exposed, > > >> >> > RDD.outputDeterministicLevel is not and it is private[spark]. > > >> >> > While I dont expect changes to this, it is inherently unstable to > > >> depend > > >> >> > on it. > > >> >> > > > >> >> > Btw, please see the discussion with Sungwoo Park, if Celeborn is > > >> >> > maintaining a reducer oriented view, you will need to recompute > all > > >> the > > >> >> > mappers anyway - what you might save is the subset of reducer > > >> partitions > > >> >> > which can be skipped if it is DETERMINATE. > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> >> > > >> >> >> for c) > > >> >> >> I also considered a similar solution in celeborn > > >> >> >> Celeborn (LifecycleManager) can get the full picture of > remaining > > >> >> shuffle > > >> >> >> data from previous stage attempt and reuse it in stage recompute > > >> >> >> , and the whole process will be transparent to > Spark/DagScheduler > > >> >> >> > > >> >> > > > >> >> > Celeborn does not have visibility into this - and this is > > potentially > > >> >> > subject to invasive changes in Apache Spark as it evolves. > > >> >> > For example, I recently merged a couple of changes which would > make > > >> this > > >> >> > different in master compared to previous versions. > > >> >> > Until the remote shuffle service SPIP is implemented and these > are > > >> >> > abstracted out & made pluggable, it will continue to be quite > > >> volatile. > > >> >> > > > >> >> > Note that the behavior for 3.5 and older is known - since Spark > > >> versions > > >> >> > have been released - it is the behavior in master and future > > >> versions of > > >> >> > Spark which is subject to change. > > >> >> > So delivering on SPARK-25299 would future proof all remote > shuffle > > >> >> > implementations. > > >> >> > > > >> >> > > > >> >> > Regards, > > >> >> > Mridul > > >> >> > > > >> >> > > > >> >> > > > >> >> >> > > >> >> >> Per my perspective, leveraging partial stage recompute and > > >> >> >> remaining shuffle data needs a lot of work to do in Celeborn > > >> >> >> I prefer to implement a simple whole stage recompute first with > > >> >> interface > > >> >> >> defined with recomputeAll = true flag, and explore partial stage > > >> >> recompute > > >> >> >> in seperate ticket as future optimization > > >> >> >> How do you think about it? > > >> >> >> > > >> >> >> Regards, > > >> >> >> Erik > > >> >> >> > > >> >> >> > > >> >> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan < > > >> mri...@gmail.com> > > >> >> >> wrote: > > >> >> >> > > >> >> >>> > > >> >> >>> > > >> >> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan < > > >> mri...@gmail.com > > >> >> > > > >> >> >>> wrote: > > >> >> >>> > > >> >> >>>> > > >> >> >>>> A reducer oriented view of shuffle, especially without > > >> replication, > > >> >> >>>> could indeed be susceptible to this issue you described (a > > single > > >> >> fetch > > >> >> >>>> failure would require all mappers to need to be recomputed) - > > >> note, > > >> >> not > > >> >> >>>> necessarily all reducers to be recomputed though. > > >> >> >>>> > > >> >> >>>> Note that I have not looked much into Celeborn specifically on > > >> this > > >> >> >>>> aspect yet, so my comments are *fairly* centric to Spark > > internals > > >> >> :-) > > >> >> >>>> > > >> >> >>>> Regards, > > >> >> >>>> Mridul > > >> >> >>>> > > >> >> >>>> > > >> >> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park < > glap...@gmail.com > > > > > >> >> wrote: > > >> >> >>>> > > >> >> >>>>> Hello, > > >> >> >>>>> > > >> >> >>>>> (Sorry for sending the same message again.) > > >> >> >>>>> > > >> >> >>>>> From my understanding, the current implementation of Celeborn > > >> makes > > >> >> it > > >> >> >>>>> hard to find out which mapper should be re-executed when a > > >> >> partition cannot > > >> >> >>>>> be read, and we should re-execute all the mappers in the > > upstream > > >> >> stage. If > > >> >> >>>>> we can find out which mapper/partition should be re-executed, > > the > > >> >> current > > >> >> >>>>> logic of stage recomputation could be (partially or totally) > > >> reused. > > >> >> >>>>> > > >> >> >>>>> Regards, > > >> >> >>>>> > > >> >> >>>>> --- Sungwoo > > >> >> >>>>> > > >> >> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan < > > >> >> mri...@gmail.com> > > >> >> >>>>> wrote: > > >> >> >>>>> > > >> >> >>>>>> > > >> >> >>>>>> Hi, > > >> >> >>>>>> > > >> >> >>>>>> Spark will try to minimize the recomputation cost as much > as > > >> >> >>>>>> possible. > > >> >> >>>>>> For example, if parent stage was DETERMINATE, it simply > needs > > to > > >> >> >>>>>> recompute the missing (mapper) partitions (which resulted in > > >> fetch > > >> >> >>>>>> failure). Note, this by itself could require further > > >> recomputation > > >> >> in the > > >> >> >>>>>> DAG if the inputs required to comput the parent partitions > are > > >> >> missing, and > > >> >> >>>>>> so on - so it is dynamic. > > >> >> >>>>>> > > >> >> >>>>>> Regards, > > >> >> >>>>>> Mridul > > >> >> >>>>>> > > >> >> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park < > > >> >> o...@pl.postech.ac.kr> > > >> >> >>>>>> wrote: > > >> >> >>>>>> > > >> >> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle > id) > > >> is > > >> >> >>>>>>> going to be > > >> >> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle > > >> output > > >> >> >>>>>>> will be > > >> >> >>>>>>> > discarded and it will be entirely recomputed (see here > > >> >> >>>>>>> > < > > >> >> >>>>>>> > > >> >> > > >> > > > https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 > > >> >> >>>>>>> > > > >> >> >>>>>>> > ). > > >> >> >>>>>>> > > >> >> >>>>>>> If a reducer (in a downstream stage) fails to read data, > can > > we > > >> >> find > > >> >> >>>>>>> out > > >> >> >>>>>>> which tasks should recompute their output? From the > previous > > >> >> >>>>>>> discussion, I > > >> >> >>>>>>> thought this was hard (in the current implementation), and > we > > >> >> should > > >> >> >>>>>>> re-execute all tasks in the upstream stage. > > >> >> >>>>>>> > > >> >> >>>>>>> Thanks, > > >> >> >>>>>>> > > >> >> >>>>>>> --- Sungwoo > > >> >> >>>>>>> > > >> >> >>>>>> > > >> >> > > >> > > > >> > > > > > >