On Mon, Oct 16, 2023 at 11:31 AM Erik fang <fme...@gmail.com> wrote:

> Hi Mridul,
>
> For a),
> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> <https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975>
> to decide whether the whole stage needs to be recomputed
> I think we can pass the same information to Celeborn in
> ShuffleManager.registerShuffle()
> <https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>,
>  since
> RDD in ShuffleDependency contains the RDD object
> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but
> luckily rdd is used internally
>
> def isIndeterminate: Boolean = {
>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
> }
>
> Relies on internal implementation is not good, but doable.
> I don't expect spark RDD/Stage implementation changes frequently, and we
> can discuss with Spark community for a RDD isIndeterminate API if they
> change it in the future
>


Only RDD.getOutputDeterministicLevel is publicly exposed,
RDD.outputDeterministicLevel is not and it is private[spark].
While I dont expect changes to this, it is inherently unstable to depend on
it.

Btw, please see the discussion with Sungwoo Park, if Celeborn is
maintaining a reducer oriented view, you will need to recompute all the
mappers anyway - what you might save is the subset of reducer partitions
which can be skipped if it is DETERMINATE.




>
> for c)
> I also considered a similar solution in celeborn
> Celeborn (LifecycleManager) can get the full picture of remaining shuffle
> data from previous stage attempt and reuse it in stage recompute
> , and the whole process will be transparent to Spark/DagScheduler
>

Celeborn does not have visibility into this - and this is potentially
subject to invasive changes in Apache Spark as it evolves.
For example, I recently merged a couple of changes which would make this
different in master compared to previous versions.
Until the remote shuffle service SPIP is implemented and these are
abstracted out & made pluggable, it will continue to be quite volatile.

Note that the behavior for 3.5 and older is known - since Spark versions
have been released - it is the behavior in master and future versions of
Spark which is subject to change.
So delivering on SPARK-25299 would future proof all remote shuffle
implementations.


Regards,
Mridul



>
> Per my perspective, leveraging partial stage recompute and
> remaining shuffle data needs a lot of work to do in Celeborn
> I prefer to implement a simple whole stage recompute first with interface
> defined with recomputeAll = true flag, and explore partial stage recompute
> in seperate ticket as future optimization
> How do you think about it?
>
> Regards,
> Erik
>
>
> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <mri...@gmail.com>
> wrote:
>
>>
>>
>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <mri...@gmail.com>
>> wrote:
>>
>>>
>>> A reducer oriented view of shuffle, especially without replication,
>>> could indeed be susceptible to this issue you described (a single fetch
>>> failure would require all mappers to need to be recomputed) - note, not
>>> necessarily all reducers to be recomputed though.
>>>
>>> Note that I have not looked much into Celeborn specifically on this
>>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
>>>
>>> Regards,
>>> Mridul
>>>
>>>
>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> (Sorry for sending the same message again.)
>>>>
>>>> From my understanding, the current implementation of Celeborn makes it
>>>> hard to find out which mapper should be re-executed when a partition cannot
>>>> be read, and we should re-execute all the mappers in the upstream stage. If
>>>> we can find out which mapper/partition should be re-executed, the current
>>>> logic of stage recomputation could be (partially or totally) reused.
>>>>
>>>> Regards,
>>>>
>>>> --- Sungwoo
>>>>
>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <mri...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>   Spark will try to minimize the recomputation cost as much as
>>>>> possible.
>>>>> For example, if parent stage was DETERMINATE, it simply needs to
>>>>> recompute the missing (mapper) partitions (which resulted in fetch
>>>>> failure). Note, this by itself could require further recomputation in the
>>>>> DAG if the inputs required to comput the parent partitions are missing, 
>>>>> and
>>>>> so on - so it is dynamic.
>>>>>
>>>>> Regards,
>>>>> Mridul
>>>>>
>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr>
>>>>> wrote:
>>>>>
>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is
>>>>>> going to be
>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output
>>>>>> will be
>>>>>> > discarded and it will be entirely recomputed (see here
>>>>>> > <
>>>>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
>>>>>> >
>>>>>> > ).
>>>>>>
>>>>>> If a reducer (in a downstream stage) fails to read data, can we find
>>>>>> out
>>>>>> which tasks should recompute their output? From the previous
>>>>>> discussion, I
>>>>>> thought this was hard (in the current implementation), and we should
>>>>>> re-execute all tasks in the upstream stage.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> --- Sungwoo
>>>>>>
>>>>>

Reply via email to