Mridul,

sure, I totally agree SPARK-25299 is a much better solution, as long as we
can get it from spark community
(btw, private[spark] of RDD.outputDeterministicLevel is no big deal,
celeborn already has spark-integration code with  [spark] scope)

I also have a question about INDETERMINATE stage recompute, and may need
your help
The rule for INDETERMINATE ShuffleMapStage rerun is reasonable, however, I
don't find related logic for INDETERMINATE ResultStage rerun in DAGScheduler
If INDETERMINATE ShuffleMapStage got entirely recomputed, the
corresponding ResultStage should be entirely recomputed as well, per my
understanding

I found https://issues.apache.org/jira/browse/SPARK-25342 to rollback a
ResultStage but it was not merged
Do you know any context or related ticket for INDETERMINATE ResultStage
rerun?

Thanks in advance!

Regards,
Erik

On Tue, Oct 17, 2023 at 4:23 AM Mridul Muralidharan <[email protected]>
wrote:

>
>
> On Mon, Oct 16, 2023 at 11:31 AM Erik fang <[email protected]> wrote:
>
>> Hi Mridul,
>>
>> For a),
>> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
>> <https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1975>
>> to decide whether the whole stage needs to be recomputed
>> I think we can pass the same information to Celeborn in
>> ShuffleManager.registerShuffle()
>> <https://github.com/apache/spark/blob/721ea9bbb2ff77b6d2f575fdca0aeda84990cc3b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L39>,
>>  since
>> RDD in ShuffleDependency contains the RDD object
>> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency,
>> but luckily rdd is used internally
>>
>> def isIndeterminate: Boolean = {
>>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
>> }
>>
>> Relies on internal implementation is not good, but doable.
>> I don't expect spark RDD/Stage implementation changes frequently, and we
>> can discuss with Spark community for a RDD isIndeterminate API if they
>> change it in the future
>>
>
>
> Only RDD.getOutputDeterministicLevel is publicly exposed,
> RDD.outputDeterministicLevel is not and it is private[spark].
> While I dont expect changes to this, it is inherently unstable to depend
> on it.
>
> Btw, please see the discussion with Sungwoo Park, if Celeborn is
> maintaining a reducer oriented view, you will need to recompute all the
> mappers anyway - what you might save is the subset of reducer partitions
> which can be skipped if it is DETERMINATE.
>
>
>
>
>>
>> for c)
>> I also considered a similar solution in celeborn
>> Celeborn (LifecycleManager) can get the full picture of remaining shuffle
>> data from previous stage attempt and reuse it in stage recompute
>> , and the whole process will be transparent to Spark/DagScheduler
>>
>
> Celeborn does not have visibility into this - and this is potentially
> subject to invasive changes in Apache Spark as it evolves.
> For example, I recently merged a couple of changes which would make this
> different in master compared to previous versions.
> Until the remote shuffle service SPIP is implemented and these are
> abstracted out & made pluggable, it will continue to be quite volatile.
>
> Note that the behavior for 3.5 and older is known - since Spark versions
> have been released - it is the behavior in master and future versions of
> Spark which is subject to change.
> So delivering on SPARK-25299 would future proof all remote shuffle
> implementations.
>
>
> Regards,
> Mridul
>
>
>
>>
>> Per my perspective, leveraging partial stage recompute and
>> remaining shuffle data needs a lot of work to do in Celeborn
>> I prefer to implement a simple whole stage recompute first with interface
>> defined with recomputeAll = true flag, and explore partial stage recompute
>> in seperate ticket as future optimization
>> How do you think about it?
>>
>> Regards,
>> Erik
>>
>>
>> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan <[email protected]>
>> wrote:
>>
>>>
>>>
>>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan <[email protected]>
>>> wrote:
>>>
>>>>
>>>> A reducer oriented view of shuffle, especially without replication,
>>>> could indeed be susceptible to this issue you described (a single fetch
>>>> failure would require all mappers to need to be recomputed) - note, not
>>>> necessarily all reducers to be recomputed though.
>>>>
>>>> Note that I have not looked much into Celeborn specifically on this
>>>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
>>>>
>>>> Regards,
>>>> Mridul
>>>>
>>>>
>>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <[email protected]> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> (Sorry for sending the same message again.)
>>>>>
>>>>> From my understanding, the current implementation of Celeborn makes it
>>>>> hard to find out which mapper should be re-executed when a partition 
>>>>> cannot
>>>>> be read, and we should re-execute all the mappers in the upstream stage. 
>>>>> If
>>>>> we can find out which mapper/partition should be re-executed, the current
>>>>> logic of stage recomputation could be (partially or totally) reused.
>>>>>
>>>>> Regards,
>>>>>
>>>>> --- Sungwoo
>>>>>
>>>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <[email protected]>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>   Spark will try to minimize the recomputation cost as much as
>>>>>> possible.
>>>>>> For example, if parent stage was DETERMINATE, it simply needs to
>>>>>> recompute the missing (mapper) partitions (which resulted in fetch
>>>>>> failure). Note, this by itself could require further recomputation in the
>>>>>> DAG if the inputs required to comput the parent partitions are missing, 
>>>>>> and
>>>>>> so on - so it is dynamic.
>>>>>>
>>>>>> Regards,
>>>>>> Mridul
>>>>>>
>>>>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> > a) If one or more tasks for a stage (and so its shuffle id) is
>>>>>>> going to be
>>>>>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output
>>>>>>> will be
>>>>>>> > discarded and it will be entirely recomputed (see here
>>>>>>> > <
>>>>>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
>>>>>>> >
>>>>>>> > ).
>>>>>>>
>>>>>>> If a reducer (in a downstream stage) fails to read data, can we find
>>>>>>> out
>>>>>>> which tasks should recompute their output? From the previous
>>>>>>> discussion, I
>>>>>>> thought this was hard (in the current implementation), and we should
>>>>>>> re-execute all tasks in the upstream stage.
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> --- Sungwoo
>>>>>>>
>>>>>>

Reply via email to