With push based shuffle in Apache Spark (magnet), we have both the map
output and reducer orientated merged output preserved - with reducer
oriented view chosen by default for reads and fallback to mapper output
when reducer output is missing/failures. That mitigates this specific issue
for DETERMINATE stages and only subset which need recomputation are
regenerated.
With magnet only smaller blocks are pushed for merged data, so effective
replication is much lower.

In our Celeborn deployment we are still testing, we will enable replication
for functional and operational reasons - probably move replication out of
the write path to speed it up further.


Regards,
Mridul

On Mon, Oct 16, 2023 at 9:08 AM Keyong Zhou <zho...@apache.org> wrote:

> Hi Sungwoo,
>
> What you are pointing out is very correct. Currently shuffle data
> is distributed across `celeborn.master.slot.assign.maxWorkers` workers,
> which defaults to 10000, so I believe the cascading stage rerun will
> definitely happen.
>
> I think setting ` celeborn.master.slot.assign.maxWorkers` to a smaller
> value can help, especially in relatively large clusters. Turning on
> replication
> can also help, but it conflicts with the purpose why we do stage rerun
> (i.e. we
> don't want to turn on replication for resource consumption reason).
>
> We didn't thought about this before, thanks for pointing that out!
>
> Thanks,
> Keyong Zhou
>
> Sungwoo Park <o...@pl.postech.ac.kr> 于2023年10月13日周五 02:22写道:
>
> > I have a question on how Celeborn distributes shuffle data among Celeborn
> > workers.
> >
> > From our observation, it seems that whenever a Celeborn worker fails or
> > gets killed (in a small cluster of less than 25 nodes), almost every edge
> > is affected. Does this mean that an edge with multiple partitions usually
> > distributes its shuffle data among all Celeborn workers?
> >
> > If this is the case, I think stage recomputation is unnecessary and just
> > re-executing the entire DAG is a better approach. Our current
> > implementation uses the following scheme for stage recomputation:
> >
> > 1. If a read failure occurs for shuffleId #1 for an edge, we pick up a
> new
> > shuffleId #2 for the same edge.
> > 2. The upstream stage re-executes all tasks, but writes the output to
> > shuffleId #2.
> > 3. Tasks in the downstream stage re-try by reading from shuffleId #2.
> >
> > From our experiment, whenever a Celeborn worker fails and a read failure
> > occurs for an edge, the re-execution of the upstream stage usally ends up
> > with another read failure because some part of its input has also been
> > lost. As a result, all upstream stages are eventually re-executed in a
> > cascading manner. In essence, the failure of a Celeborn worker
> invalidates
> > all existing shuffleIds.
> >
> > (This is what we observed with Hive-MR3-Celeborn, but I guess stage
> > recomputation in Spark will have to deal with the same problem.)
> >
> > Thanks,
> >
> > --- Sungwoo
> >
>

Reply via email to