Hi Jiashu,

Thank your for your reply. Based on what you advised, we have fully implemented stage rerun in Hive-MR3, where a read failure triggers the whole re-execution of the parent stage. There were not many changes to the code for interfacing with Celeborn (which makes me think again that Celeborn API is quite elegant).

Thanks a lot,

--- Sungwoo Park

On Tue, 8 Oct 2024, rexxiong wrote:

Hi Sungwoo,

Glad to see that the integration of Mr3 with Celeborn works well and that the 
upgrade
from version 0.3 to 0.5 went smoothly.
Now, Celeborn supports Spark stage reruns, which are already in production. And 
for
your questions:

1. Suppose that a reducer fails to read the output of a certain mapper. In
such a case, should we re-execute all the mappers in the previous stage?
Or, is it okay to re-execute only the mapper whose output is lost?
In our previous implementation, MR3-Celeborn does not fully support task
rerun (similar to stage rerun) because Celeborn does not return the
identity of mapper tasks whose output has been lost.

In my opinion, Mr3 should use a new shuffle ID to re-execute all mappers and 
generate
fresh outputs for that reducer. This is necessary because the Celeborn Worker 
merges
all mapper outputs by partition ID into the same files. If the reducer is 
unable to
read these files, it indicates that the data is lost, which indicates that
mappers(probably all,  and I think only rerun those mapper which associate the 
data is
not easy in celeborn, possible but may change a lot) that wrote to those files 
should
be re-executed

2. When a reducer tries to read the output of mappers, when is it okay to
use the application shuffle ID?
If MR3 doesn't support stage rerun as before, MR3 can still use the application
Shuffle Id. Additionally, for Spark applications, we can disable stage rerun 
support
by setting celeborn.client.spark.fetch.throwsFetchFailure to false. This will 
enable
Spark to continue using the application Shuffle ID.

3. Along the same line of question 2, should we always get Celeborn
shuffle IDs when trying to read the output of mappers? Considering the
fact the the current code of MR3-Celeborn works fine, it seems like this
is not always necessary.

If MR3 does not require support for stage reruns, we can continue to use the
application shuffle ID as before. However, if support for stage reruns is 
needed in
MR3, we will need to regenerate the shuffle ID for each stage attempt. This 
will allow
us to distinguish between the map outputs for the reducers across different 
attempts
of the same stage.

Thanks,Jiashu Xiong

Sungwoo Park <[email protected]> 于2024年10月7日周一 14:20?道:
      (I left this message in Celeborn Slack channel, but perhaps this mailing
      list is the right place.)

      Hello,

      Previously we implemented an extension of MR3 (an execution engine) to
      support Celeborn 0.3.1. For a short introduction, please see:
      https://mr3docs.datamonad.com/docs/mr3/features/celeborn/

      Now we are upgrading Celeborn to 0.5.1 and working on supporting stage
      rerun, much like Spark-Celeborn.

      To my (pleasant) surprise, upgrading Celeborn from 0.3.1 to 0.5.1 was
      quite smooth. After recompiling with Celeborn 0.5.1, MR3-Celeborn just
      worked fine. I was surprised because the current code does not obtain
      Celeborn shuffle IDs at all (because there was no notion of Celeborn
      shuffle IDs back in 0.3.1) and we use only application shuffle IDs which
      are generated by MR3 (similarly to Spark shuffle IDs).

      I have a few questions.

      1. Suppose that a reducer fails to read the output of a certain mapper. In
      such a case, should we re-execute all the mappers in the previous stage?
      Or, is it okay to re-execute only the mapper whose output is lost?
      In our previous implementation, MR3-Celeborn does not fully support task
      rerun (similar to stage rerun) because Celeborn does not return the
      identity of mapper tasks whose output has been lost.

      2. When a reducer tries to read the output of mappers, when is it okay to
      use the application shuffle ID?

      3. Along the same line of question 2, should we always get Celeborn
      shuffle IDs when trying to read the output of mappers? Considering the
      fact the the current code of MR3-Celeborn works fine, it seems like this
      is not always necessary.

      Thank you.

      --- Sungwoo Park


Reply via email to