Re: Question on Celeborn workers,

2023-10-16 Thread Keyong Zhou
Yeah, retaining the map output can reduce the needed tasks to be
recomputed for DETERMINATE stages when an output file is lost.
This is one important design tradeoff.

Currently Celeborn also supports MapPartition for Flink Batch, in
which case partition data is not aggregated, instead one mapper's
output is stored in one file (perhaps multiple files if split happens),
very similar to how ESS stores shuffle data. Combining MapPartition
with ReducePartition (aggregate partition data) in Celeborn the same
way how magnet does may be an interesting idea.

Thanks,
Keyong Zhou

Mridul Muralidharan  于2023年10月17日周二 00:01写道:

> With push based shuffle in Apache Spark (magnet), we have both the map
> output and reducer orientated merged output preserved - with reducer
> oriented view chosen by default for reads and fallback to mapper output
> when reducer output is missing/failures. That mitigates this specific issue
> for DETERMINATE stages and only subset which need recomputation are
> regenerated.
> With magnet only smaller blocks are pushed for merged data, so effective
> replication is much lower.
>
> In our Celeborn deployment we are still testing, we will enable replication
> for functional and operational reasons - probably move replication out of
> the write path to speed it up further.
>
>
> Regards,
> Mridul
>
> On Mon, Oct 16, 2023 at 9:08 AM Keyong Zhou  wrote:
>
> > Hi Sungwoo,
> >
> > What you are pointing out is very correct. Currently shuffle data
> > is distributed across `celeborn.master.slot.assign.maxWorkers` workers,
> > which defaults to 1, so I believe the cascading stage rerun will
> > definitely happen.
> >
> > I think setting ` celeborn.master.slot.assign.maxWorkers` to a smaller
> > value can help, especially in relatively large clusters. Turning on
> > replication
> > can also help, but it conflicts with the purpose why we do stage rerun
> > (i.e. we
> > don't want to turn on replication for resource consumption reason).
> >
> > We didn't thought about this before, thanks for pointing that out!
> >
> > Thanks,
> > Keyong Zhou
> >
> > Sungwoo Park  于2023年10月13日周五 02:22写道:
> >
> > > I have a question on how Celeborn distributes shuffle data among
> Celeborn
> > > workers.
> > >
> > > From our observation, it seems that whenever a Celeborn worker fails or
> > > gets killed (in a small cluster of less than 25 nodes), almost every
> edge
> > > is affected. Does this mean that an edge with multiple partitions
> usually
> > > distributes its shuffle data among all Celeborn workers?
> > >
> > > If this is the case, I think stage recomputation is unnecessary and
> just
> > > re-executing the entire DAG is a better approach. Our current
> > > implementation uses the following scheme for stage recomputation:
> > >
> > > 1. If a read failure occurs for shuffleId #1 for an edge, we pick up a
> > new
> > > shuffleId #2 for the same edge.
> > > 2. The upstream stage re-executes all tasks, but writes the output to
> > > shuffleId #2.
> > > 3. Tasks in the downstream stage re-try by reading from shuffleId #2.
> > >
> > > From our experiment, whenever a Celeborn worker fails and a read
> failure
> > > occurs for an edge, the re-execution of the upstream stage usally ends
> up
> > > with another read failure because some part of its input has also been
> > > lost. As a result, all upstream stages are eventually re-executed in a
> > > cascading manner. In essence, the failure of a Celeborn worker
> > invalidates
> > > all existing shuffleIds.
> > >
> > > (This is what we observed with Hive-MR3-Celeborn, but I guess stage
> > > recomputation in Spark will have to deal with the same problem.)
> > >
> > > Thanks,
> > >
> > > --- Sungwoo
> > >
> >
>


Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-16 Thread Mridul Muralidharan
On Mon, Oct 16, 2023 at 11:31 AM Erik fang  wrote:

> Hi Mridul,
>
> For a),
> DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()
> 
> to decide whether the whole stage needs to be recomputed
> I think we can pass the same information to Celeborn in
> ShuffleManager.registerShuffle()
> ,
>  since
> RDD in ShuffleDependency contains the RDD object
> It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but
> luckily rdd is used internally
>
> def isIndeterminate: Boolean = {
>   rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
> }
>
> Relies on internal implementation is not good, but doable.
> I don't expect spark RDD/Stage implementation changes frequently, and we
> can discuss with Spark community for a RDD isIndeterminate API if they
> change it in the future
>


Only RDD.getOutputDeterministicLevel is publicly exposed,
RDD.outputDeterministicLevel is not and it is private[spark].
While I dont expect changes to this, it is inherently unstable to depend on
it.

Btw, please see the discussion with Sungwoo Park, if Celeborn is
maintaining a reducer oriented view, you will need to recompute all the
mappers anyway - what you might save is the subset of reducer partitions
which can be skipped if it is DETERMINATE.




>
> for c)
> I also considered a similar solution in celeborn
> Celeborn (LifecycleManager) can get the full picture of remaining shuffle
> data from previous stage attempt and reuse it in stage recompute
> , and the whole process will be transparent to Spark/DagScheduler
>

Celeborn does not have visibility into this - and this is potentially
subject to invasive changes in Apache Spark as it evolves.
For example, I recently merged a couple of changes which would make this
different in master compared to previous versions.
Until the remote shuffle service SPIP is implemented and these are
abstracted out & made pluggable, it will continue to be quite volatile.

Note that the behavior for 3.5 and older is known - since Spark versions
have been released - it is the behavior in master and future versions of
Spark which is subject to change.
So delivering on SPARK-25299 would future proof all remote shuffle
implementations.


Regards,
Mridul



>
> Per my perspective, leveraging partial stage recompute and
> remaining shuffle data needs a lot of work to do in Celeborn
> I prefer to implement a simple whole stage recompute first with interface
> defined with recomputeAll = true flag, and explore partial stage recompute
> in seperate ticket as future optimization
> How do you think about it?
>
> Regards,
> Erik
>
>
> On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan 
> wrote:
>
>>
>>
>> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan 
>> wrote:
>>
>>>
>>> A reducer oriented view of shuffle, especially without replication,
>>> could indeed be susceptible to this issue you described (a single fetch
>>> failure would require all mappers to need to be recomputed) - note, not
>>> necessarily all reducers to be recomputed though.
>>>
>>> Note that I have not looked much into Celeborn specifically on this
>>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
>>>
>>> Regards,
>>> Mridul
>>>
>>>
>>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park  wrote:
>>>
 Hello,

 (Sorry for sending the same message again.)

 From my understanding, the current implementation of Celeborn makes it
 hard to find out which mapper should be re-executed when a partition cannot
 be read, and we should re-execute all the mappers in the upstream stage. If
 we can find out which mapper/partition should be re-executed, the current
 logic of stage recomputation could be (partially or totally) reused.

 Regards,

 --- Sungwoo

 On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan 
 wrote:

>
> Hi,
>
>   Spark will try to minimize the recomputation cost as much as
> possible.
> For example, if parent stage was DETERMINATE, it simply needs to
> recompute the missing (mapper) partitions (which resulted in fetch
> failure). Note, this by itself could require further recomputation in the
> DAG if the inputs required to comput the parent partitions are missing, 
> and
> so on - so it is dynamic.
>
> Regards,
> Mridul
>
> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park 
> wrote:
>
>> > a) If one or more tasks for a stage (and so its shuffle id) is
>> going to be
>> > recomputed, if it is an INDETERMINATE stage, all shuffle output
>> will be
>> > discarded and it will be entirely recomputed (see here
>> > <
>> https://gith

Re: [PROPOSAL] Spark stage resubmission for shuffle fetch failure

2023-10-16 Thread Erik fang
Hi Mridul,

For a),
DagScheduler uses Stage.isIndeterminate() and RDD.isBarrier()

to decide whether the whole stage needs to be recomputed
I think we can pass the same information to Celeborn in
ShuffleManager.registerShuffle()
,
since
RDD in ShuffleDependency contains the RDD object
It seems Stage.isIndeterminate() is unreadable from ShuffleDependency, but
luckily rdd is used internally

def isIndeterminate: Boolean = {
  rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE
}

Relies on internal implementation is not good, but doable.
I don't expect spark RDD/Stage implementation changes frequently, and we
can discuss with Spark community for a RDD isIndeterminate API if they
change it in the future

for c)
I also considered a similar solution in celeborn
Celeborn (LifecycleManager) can get the full picture of remaining shuffle
data from previous stage attempt and reuse it in stage recompute
, and the whole process will be transparent to Spark/DagScheduler

Per my perspective, leveraging partial stage recompute and
remaining shuffle data needs a lot of work to do in Celeborn
I prefer to implement a simple whole stage recompute first with interface
defined with recomputeAll = true flag, and explore partial stage recompute
in seperate ticket as future optimization
How do you think about it?

Regards,
Erik


On Sat, Oct 14, 2023 at 4:50 PM Mridul Muralidharan 
wrote:

>
>
> On Sat, Oct 14, 2023 at 3:49 AM Mridul Muralidharan 
> wrote:
>
>>
>> A reducer oriented view of shuffle, especially without replication, could
>> indeed be susceptible to this issue you described (a single fetch failure
>> would require all mappers to need to be recomputed) - note, not necessarily
>> all reducers to be recomputed though.
>>
>> Note that I have not looked much into Celeborn specifically on this
>> aspect yet, so my comments are *fairly* centric to Spark internals :-)
>>
>> Regards,
>> Mridul
>>
>>
>> On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park  wrote:
>>
>>> Hello,
>>>
>>> (Sorry for sending the same message again.)
>>>
>>> From my understanding, the current implementation of Celeborn makes it
>>> hard to find out which mapper should be re-executed when a partition cannot
>>> be read, and we should re-execute all the mappers in the upstream stage. If
>>> we can find out which mapper/partition should be re-executed, the current
>>> logic of stage recomputation could be (partially or totally) reused.
>>>
>>> Regards,
>>>
>>> --- Sungwoo
>>>
>>> On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan 
>>> wrote:
>>>

 Hi,

   Spark will try to minimize the recomputation cost as much as possible.
 For example, if parent stage was DETERMINATE, it simply needs to
 recompute the missing (mapper) partitions (which resulted in fetch
 failure). Note, this by itself could require further recomputation in the
 DAG if the inputs required to comput the parent partitions are missing, and
 so on - so it is dynamic.

 Regards,
 Mridul

 On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park 
 wrote:

> > a) If one or more tasks for a stage (and so its shuffle id) is going
> to be
> > recomputed, if it is an INDETERMINATE stage, all shuffle output will
> be
> > discarded and it will be entirely recomputed (see here
> > <
> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477
> >
> > ).
>
> If a reducer (in a downstream stage) fails to read data, can we find
> out
> which tasks should recompute their output? From the previous
> discussion, I
> thought this was hard (in the current implementation), and we should
> re-execute all tasks in the upstream stage.
>
> Thanks,
>
> --- Sungwoo
>



Re: Question on Celeborn workers,

2023-10-16 Thread Mridul Muralidharan
With push based shuffle in Apache Spark (magnet), we have both the map
output and reducer orientated merged output preserved - with reducer
oriented view chosen by default for reads and fallback to mapper output
when reducer output is missing/failures. That mitigates this specific issue
for DETERMINATE stages and only subset which need recomputation are
regenerated.
With magnet only smaller blocks are pushed for merged data, so effective
replication is much lower.

In our Celeborn deployment we are still testing, we will enable replication
for functional and operational reasons - probably move replication out of
the write path to speed it up further.


Regards,
Mridul

On Mon, Oct 16, 2023 at 9:08 AM Keyong Zhou  wrote:

> Hi Sungwoo,
>
> What you are pointing out is very correct. Currently shuffle data
> is distributed across `celeborn.master.slot.assign.maxWorkers` workers,
> which defaults to 1, so I believe the cascading stage rerun will
> definitely happen.
>
> I think setting ` celeborn.master.slot.assign.maxWorkers` to a smaller
> value can help, especially in relatively large clusters. Turning on
> replication
> can also help, but it conflicts with the purpose why we do stage rerun
> (i.e. we
> don't want to turn on replication for resource consumption reason).
>
> We didn't thought about this before, thanks for pointing that out!
>
> Thanks,
> Keyong Zhou
>
> Sungwoo Park  于2023年10月13日周五 02:22写道:
>
> > I have a question on how Celeborn distributes shuffle data among Celeborn
> > workers.
> >
> > From our observation, it seems that whenever a Celeborn worker fails or
> > gets killed (in a small cluster of less than 25 nodes), almost every edge
> > is affected. Does this mean that an edge with multiple partitions usually
> > distributes its shuffle data among all Celeborn workers?
> >
> > If this is the case, I think stage recomputation is unnecessary and just
> > re-executing the entire DAG is a better approach. Our current
> > implementation uses the following scheme for stage recomputation:
> >
> > 1. If a read failure occurs for shuffleId #1 for an edge, we pick up a
> new
> > shuffleId #2 for the same edge.
> > 2. The upstream stage re-executes all tasks, but writes the output to
> > shuffleId #2.
> > 3. Tasks in the downstream stage re-try by reading from shuffleId #2.
> >
> > From our experiment, whenever a Celeborn worker fails and a read failure
> > occurs for an edge, the re-execution of the upstream stage usally ends up
> > with another read failure because some part of its input has also been
> > lost. As a result, all upstream stages are eventually re-executed in a
> > cascading manner. In essence, the failure of a Celeborn worker
> invalidates
> > all existing shuffleIds.
> >
> > (This is what we observed with Hive-MR3-Celeborn, but I guess stage
> > recomputation in Spark will have to deal with the same problem.)
> >
> > Thanks,
> >
> > --- Sungwoo
> >
>


Re: Question on Celeborn workers,

2023-10-16 Thread Keyong Zhou
Hi Sungwoo,

What you are pointing out is very correct. Currently shuffle data
is distributed across `celeborn.master.slot.assign.maxWorkers` workers,
which defaults to 1, so I believe the cascading stage rerun will
definitely happen.

I think setting ` celeborn.master.slot.assign.maxWorkers` to a smaller
value can help, especially in relatively large clusters. Turning on
replication
can also help, but it conflicts with the purpose why we do stage rerun
(i.e. we
don't want to turn on replication for resource consumption reason).

We didn't thought about this before, thanks for pointing that out!

Thanks,
Keyong Zhou

Sungwoo Park  于2023年10月13日周五 02:22写道:

> I have a question on how Celeborn distributes shuffle data among Celeborn
> workers.
>
> From our observation, it seems that whenever a Celeborn worker fails or
> gets killed (in a small cluster of less than 25 nodes), almost every edge
> is affected. Does this mean that an edge with multiple partitions usually
> distributes its shuffle data among all Celeborn workers?
>
> If this is the case, I think stage recomputation is unnecessary and just
> re-executing the entire DAG is a better approach. Our current
> implementation uses the following scheme for stage recomputation:
>
> 1. If a read failure occurs for shuffleId #1 for an edge, we pick up a new
> shuffleId #2 for the same edge.
> 2. The upstream stage re-executes all tasks, but writes the output to
> shuffleId #2.
> 3. Tasks in the downstream stage re-try by reading from shuffleId #2.
>
> From our experiment, whenever a Celeborn worker fails and a read failure
> occurs for an edge, the re-execution of the upstream stage usally ends up
> with another read failure because some part of its input has also been
> lost. As a result, all upstream stages are eventually re-executed in a
> cascading manner. In essence, the failure of a Celeborn worker invalidates
> all existing shuffleIds.
>
> (This is what we observed with Hive-MR3-Celeborn, but I guess stage
> recomputation in Spark will have to deal with the same problem.)
>
> Thanks,
>
> --- Sungwoo
>