Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Mridul Muralidharan
On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan 
wrote:

>
> A reducer oriented view of shuffle, especially without replication, could
> indeed be susceptible to this issue you described (a single fetch failure
> would require all mappers to need to be recomputed) - note, not necessarily
> all reducers to be recomputed though.
>
> Note that I have not looked much into Celeborn specifically on this aspect
> yet, so my comments are *fairly* centric to Spark internals :-)
>
> Regards,
> Mridul
>
>
> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park  wrote:
>
>> Hello,
>>
>> (Sorry for sending the same message again.)
>>
>> From my understanding, the current implementation of Celeborn makes it
>> hard to find out which mapper should be re-executed when a partition cannot
>> be read, and we should re-execute all the mappers in the upstream stage. If
>> we can find out which mapper/partition should be re-executed, the current
>> logic of stage recomputation could be (partially or totally) reused.
>>
>> Regards,
>>
>> --- Sungwoo
>>
>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>>   Spark will try to minimize the recomputation cost as much as possible.
>>> For example, if parent stage was DETERMINATE, it simply needs to
>>> recompute the missing (mapper) partitions (which resulted in fetch
>>> failure). Note, this by itself could require further recomputation in the
>>> DAG if the inputs required to comput the parent partitions are missing, and
>>> so on - so it is dynamic.
>>>
>>> Regards,
>>> Mridul
>>>
>>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park 
>>> wrote:
>>>
 > a) If one or more tasks for a stage (and so its shuffle id) is going
 to be
 > recomputed, if it is an INDETERMINATE stage, all shuffle output will
 be
 > discarded and it will be entirely recomputed (see here
 > <
 https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
 >
 > ).

 If a reducer (in a downstream stage) fails to read data, can we find
 out
 which tasks should recompute their output? From the previous
 discussion, I
 thought this was hard (in the current implementation), and we should
 re-execute all tasks in the upstream stage.

 Thanks,

 --- Sungwoo

>>>


Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Mridul Muralidharan
A reducer oriented view of shuffle, especially without replication, could
indeed be susceptible to this issue you described (a single fetch failure
would require all mappers to need to be recomputed) - note, not necessarily
all reducers to be recomputed though.

Note that I have not looked much into Celeborn specifically on this aspect
yet, so my comments are failure centric to Spark internals :-)

Regards,
Mridul


On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park  wrote:

> Hello,
>
> (Sorry for sending the same message again.)
>
> From my understanding, the current implementation of Celeborn makes it
> hard to find out which mapper should be re-executed when a partition cannot
> be read, and we should re-execute all the mappers in the upstream stage. If
> we can find out which mapper/partition should be re-executed, the current
> logic of stage recomputation could be (partially or totally) reused.
>
> Regards,
>
> --- Sungwoo
>
> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan 
> wrote:
>
>>
>> Hi,
>>
>>   Spark will try to minimize the recomputation cost as much as possible.
>> For example, if parent stage was DETERMINATE, it simply needs to
>> recompute the missing (mapper) partitions (which resulted in fetch
>> failure). Note, this by itself could require further recomputation in the
>> DAG if the inputs required to comput the parent partitions are missing, and
>> so on - so it is dynamic.
>>
>> Regards,
>> Mridul
>>
>> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park 
>> wrote:
>>
>>> > a) If one or more tasks for a stage (and so its shuffle id) is going
>>> to be
>>> > recomputed, if it is an INDETERMINATE stage, all shuffle output will be
>>> > discarded and it will be entirely recomputed (see here
>>> > <
>>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
>>> >
>>> > ).
>>>
>>> If a reducer (in a downstream stage) fails to read data, can we find out
>>> which tasks should recompute their output? From the previous discussion,
>>> I
>>> thought this was hard (in the current implementation), and we should
>>> re-execute all tasks in the upstream stage.
>>>
>>> Thanks,
>>>
>>> --- Sungwoo
>>>
>>


Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Mridul Muralidharan
Hi,

  Spark will try to minimize the recomputation cost as much as possible.
For example, if parent stage was DETERMINATE, it simply needs to recompute
the missing (mapper) partitions (which resulted in fetch failure). Note,
this by itself could require further recomputation in the DAG if the inputs
required to comput the parent partitions are missing, and so on - so it is
dynamic.

Regards,
Mridul

On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park  wrote:

> > a) If one or more tasks for a stage (and so its shuffle id) is going to
> be
> > recomputed, if it is an INDETERMINATE stage, all shuffle output will be
> > discarded and it will be entirely recomputed (see here
> > <
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >
> > ).
>
> If a reducer (in a downstream stage) fails to read data, can we find out
> which tasks should recompute their output? From the previous discussion, I
> thought this was hard (in the current implementation), and we should
> re-execute all tasks in the upstream stage.
>
> Thanks,
>
> --- Sungwoo
>


Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-14 Thread Sungwoo Park

a) If one or more tasks for a stage (and so its shuffle id) is going to be
recomputed, if it is an INDETERMINATE stage, all shuffle output will be
discarded and it will be entirely recomputed (see here

).


If a reducer (in a downstream stage) fails to read data, can we find out 
which tasks should recompute their output? From the previous discussion, I 
thought this was hard (in the current implementation), and we should 
re-execute all tasks in the upstream stage.


Thanks,

--- Sungwoo