Hi Sungwoo,

Glad to see that the integration of Mr3 with Celeborn works well and that
the upgrade from version 0.3 to 0.5 went smoothly.
Now, Celeborn supports Spark stage reruns, which are already in production.
And for your questions:

1. Suppose that a reducer fails to read the output of a certain mapper. In
such a case, should we re-execute all the mappers in the previous stage?
Or, is it okay to re-execute only the mapper whose output is lost?
In our previous implementation, MR3-Celeborn does not fully support task
rerun (similar to stage rerun) because Celeborn does not return the
identity of mapper tasks whose output has been lost.

In my opinion, Mr3 should use a new shuffle ID to re-execute all mappers
and generate fresh outputs for that reducer. This is necessary because the
Celeborn Worker merges all mapper outputs by partition ID into the same
files. If the reducer is unable to read these files, it indicates that the
data is lost, which indicates that mappers(probably all,  and I think only
rerun those mapper which associate the data is not easy in
celeborn, possible but may change a lot) that wrote to those files should
be re-executed

2. When a reducer tries to read the output of mappers, when is it okay to
use the application shuffle ID?
If MR3 doesn't support stage rerun as before, MR3 can still use the
application Shuffle Id. Additionally, for Spark applications, we can
disable stage rerun support by setting
celeborn.client.spark.fetch.throwsFetchFailure to false. This will enable
Spark to continue using the application Shuffle ID.

3. Along the same line of question 2, should we always get Celeborn
shuffle IDs when trying to read the output of mappers? Considering the
fact the the current code of MR3-Celeborn works fine, it seems like this
is not always necessary.

If MR3 does not require support for stage reruns, we can continue to use
the application shuffle ID as before. However, if support for stage reruns
is needed in MR3, we will need to regenerate the shuffle ID for each stage
attempt. This will allow us to distinguish between the map outputs for the
reducers across different attempts of the same stage.

Thanks,
Jiashu Xiong

Sungwoo Park <[email protected]> 于2024年10月7日周一 14:20写道:

> (I left this message in Celeborn Slack channel, but perhaps this mailing
> list is the right place.)
>
> Hello,
>
> Previously we implemented an extension of MR3 (an execution engine) to
> support Celeborn 0.3.1. For a short introduction, please see:
> https://mr3docs.datamonad.com/docs/mr3/features/celeborn/
>
> Now we are upgrading Celeborn to 0.5.1 and working on supporting stage
> rerun, much like Spark-Celeborn.
>
> To my (pleasant) surprise, upgrading Celeborn from 0.3.1 to 0.5.1 was
> quite smooth. After recompiling with Celeborn 0.5.1, MR3-Celeborn just
> worked fine. I was surprised because the current code does not obtain
> Celeborn shuffle IDs at all (because there was no notion of Celeborn
> shuffle IDs back in 0.3.1) and we use only application shuffle IDs which
> are generated by MR3 (similarly to Spark shuffle IDs).
>
> I have a few questions.
>
> 1. Suppose that a reducer fails to read the output of a certain mapper. In
> such a case, should we re-execute all the mappers in the previous stage?
> Or, is it okay to re-execute only the mapper whose output is lost?
> In our previous implementation, MR3-Celeborn does not fully support task
> rerun (similar to stage rerun) because Celeborn does not return the
> identity of mapper tasks whose output has been lost.
>
> 2. When a reducer tries to read the output of mappers, when is it okay to
> use the application shuffle ID?
>
> 3. Along the same line of question 2, should we always get Celeborn
> shuffle IDs when trying to read the output of mappers? Considering the
> fact the the current code of MR3-Celeborn works fine, it seems like this
> is not always necessary.
>
> Thank you.
>
> --- Sungwoo Park
>
>

Reply via email to