In fact, I'm wondering if Spark will rerun the whole reduce ShuffleMapStage
if its upstream ShuffleMapStage is INDETERMINATE and rerun.

Keyong Zhou <zho...@apache.org> 于2023年10月19日周四 23:00写道:

> Thanks Erik for bringing up this question, I'm also curious about the
> answer, any feedback is appreciated.
>
> Thanks,
> Keyong Zhou
>
> Erik fang <fme...@gmail.com> 于2023年10月19日周四 22:16写道:
>
>> Mridul,
>>
>> sure, I totally agree SPARK-25299 is a much better solution, as long as we
>> can get it from spark community
>> (btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
>> celeborn already has spark-integration code with  [spark] scope)
>>
>> I also have a question about INDETERMINATE stage recompute, and may need
>> your help
>> The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I
>> don't find related logic for INDETERMINATE ResultStage rerun in
>> DAGScheduler
>> If INDETERMINATE ShuffleMapStage got entirely recomputed, the
>> corresponding ResultStage should be entirely recomputed as well, per my
>> understanding
>>
>> I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a
>> ResultStage but it was not merged
>> Do you know any context or related ticket for INDETERMINATE ResultStage
>> rerun?
>>
>> Thanks in advance!
>>
>> Regards,
>> Erik
>>
>> On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <mri...@gmail.com>
>> wrote:
>>
>> >
>> >
>> > On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote:
>> >
>> >> Hi Mridul,
>> >>
>> >> For a),
>> >> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
>> >> <
>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975
>> >
>> >> to decide whether the whole stage needs to be recomputed
>> >> I think we can pass the same information to Celeborn in
>> >> ShuffleManager.registerShuffle()
>> >> <
>> https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>,
>> since
>> >> RDD in ShuffleDependency contains the RDD object
>> >> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency,
>> >> but luckily rdd is used internally
>> >>
>> >> def isIndeterminate: Boolean = {
>> >>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
>> >> }
>> >>
>> >> Relies on internal implementation is not good, but doable.
>> >> I don't expect spark RDD/Stage implementation changes frequently, and
>> we
>> >> can discuss with Spark community for a RDD isIndeterminate API if they
>> >> change it in the future
>> >>
>> >
>> >
>> > Only RDD.getOutputDeterministicLevel is publicly exposed,
>> > RDD.outputDeterministicLevel is not and it is private[spark].
>> > While I dont expect changes to this, it is inherently unstable to depend
>> > on it.
>> >
>> > Btw, please see the discussion with Sungwoo Park, if Celeborn is
>> > maintaining a reducer oriented view, you will need to recompute all the
>> > mappers anyway - what you might save is the subset of reducer partitions
>> > which can be skipped if it is DETERMINATE.
>> >
>> >
>> >
>> >
>> >>
>> >> for c)
>> >> I also considered a similar solution in celeborn
>> >> Celeborn (LifecycleManager) can get the full picture of remaining
>> shuffle
>> >> data from previous stage attempt and reuse it in stage recompute
>> >> , and the whole process will be transparent to Spark/DagScheduler
>> >>
>> >
>> > Celeborn does not have visibility into this - and this is potentially
>> > subject to invasive changes in Apache Spark as it evolves.
>> > For example, I recently merged a couple of changes which would make this
>> > different in master compared to previous versions.
>> > Until the remote shuffle service SPIP is implemented and these are
>> > abstracted out & made pluggable, it will continue to be quite volatile.
>> >
>> > Note that the behavior for 3.5 and older is known - since Spark versions
>> > have been released - it is the behavior in master and future versions of
>> > Spark which is subject to change.
>> > So delivering on SPARK-25299 would future proof all remote shuffle
>> > implementations.
>> >
>> >
>> > Regards,
>> > Mridul
>> >
>> >
>> >
>> >>
>> >> Per my perspective, leveraging partial stage recompute and
>> >> remaining shuffle data needs a lot of work to do in Celeborn
>> >> I prefer to implement a simple whole stage recompute first with
>> interface
>> >> defined with recomputeAll = true flag, and explore partial stage
>> recompute
>> >> in seperate ticket as future optimization
>> >> How do you think about it?
>> >>
>> >> Regards,
>> >> Erik
>> >>
>> >>
>> >> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com>
>> >> wrote:
>> >>
>> >>>
>> >>>
>> >>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com
>> >
>> >>> wrote:
>> >>>
>> >>>>
>> >>>> A reducer oriented view of shuffle, especially without replication,
>> >>>> could indeed be susceptible to this issue you described (a single
>> fetch
>> >>>> failure would require all mappers to need to be recomputed) - note,
>> not
>> >>>> necessarily all reducers to be recomputed though.
>> >>>>
>> >>>> Note that I have not looked much into Celeborn specifically on this
>> >>>> aspect yet, so my comments are *fairly* centric to Spark internals
>> :-)
>> >>>>
>> >>>> Regards,
>> >>>> Mridul
>> >>>>
>> >>>>
>> >>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com>
>> wrote:
>> >>>>
>> >>>>> Hello,
>> >>>>>
>> >>>>> (Sorry for sending the same message again.)
>> >>>>>
>> >>>>> From my understanding, the current implementation of Celeborn makes
>> it
>> >>>>> hard to find out which mapper should be re-executed when a
>> partition cannot
>> >>>>> be read, and we should re-execute all the mappers in the upstream
>> stage. If
>> >>>>> we can find out which mapper/partition should be re-executed, the
>> current
>> >>>>> logic of stage recomputation could be (partially or totally) reused.
>> >>>>>
>> >>>>> Regards,
>> >>>>>
>> >>>>> --- Sungwoo
>> >>>>>
>> >>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <
>> mri...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>>>
>> >>>>>> Hi,
>> >>>>>>
>> >>>>>>   Spark will try to minimize the recomputation cost as much as
>> >>>>>> possible.
>> >>>>>> For example, if parent stage was DETERMINATE, it simply needs to
>> >>>>>> recompute the missing (mapper) partitions (which resulted in fetch
>> >>>>>> failure). Note, this by itself could require further recomputation
>> in the
>> >>>>>> DAG if the inputs required to comput the parent partitions are
>> missing, and
>> >>>>>> so on - so it is dynamic.
>> >>>>>>
>> >>>>>> Regards,
>> >>>>>> Mridul
>> >>>>>>
>> >>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <
>> o...@pl.postech.ac.kr>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is
>> >>>>>>> going to be
>> >>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output
>> >>>>>>> will be
>> >>>>>>> > discarded and it will be entirely recomputed (see here
>> >>>>>>> > <
>> >>>>>>>
>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
>> >>>>>>> >
>> >>>>>>> > ).
>> >>>>>>>
>> >>>>>>> If a reducer (in a downstream stage) fails to read data, can we
>> find
>> >>>>>>> out
>> >>>>>>> which tasks should recompute their output? From the previous
>> >>>>>>> discussion, I
>> >>>>>>> thought this was hard (in the current implementation), and we
>> should
>> >>>>>>> re-execute all tasks in the upstream stage.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>>
>> >>>>>>> --- Sungwoo
>> >>>>>>>
>> >>>>>>
>>
>

Reply via email to