Hi Sungwoo, Glad to see that the integration of Mr3 with Celeborn works well and that the upgrade from version 0.3 to 0.5 went smoothly. Now, Celeborn supports Spark stage reruns, which are already in production. And for your questions:
1. Suppose that a reducer fails to read the output of a certain mapper. In such a case, should we re-execute all the mappers in the previous stage? Or, is it okay to re-execute only the mapper whose output is lost? In our previous implementation, MR3-Celeborn does not fully support task rerun (similar to stage rerun) because Celeborn does not return the identity of mapper tasks whose output has been lost. In my opinion, Mr3 should use a new shuffle ID to re-execute all mappers and generate fresh outputs for that reducer. This is necessary because the Celeborn Worker merges all mapper outputs by partition ID into the same files. If the reducer is unable to read these files, it indicates that the data is lost, which indicates that mappers(probably all, and I think only rerun those mapper which associate the data is not easy in celeborn, possible but may change a lot) that wrote to those files should be re-executed 2. When a reducer tries to read the output of mappers, when is it okay to use the application shuffle ID? If MR3 doesn't support stage rerun as before, MR3 can still use the application Shuffle Id. Additionally, for Spark applications, we can disable stage rerun support by setting celeborn.client.spark.fetch.throwsFetchFailure to false. This will enable Spark to continue using the application Shuffle ID. 3. Along the same line of question 2, should we always get Celeborn shuffle IDs when trying to read the output of mappers? Considering the fact the the current code of MR3-Celeborn works fine, it seems like this is not always necessary. If MR3 does not require support for stage reruns, we can continue to use the application shuffle ID as before. However, if support for stage reruns is needed in MR3, we will need to regenerate the shuffle ID for each stage attempt. This will allow us to distinguish between the map outputs for the reducers across different attempts of the same stage. Thanks, Jiashu Xiong Sungwoo Park <[email protected]> 于2024年10月7日周一 14:20写道: > (I left this message in Celeborn Slack channel, but perhaps this mailing > list is the right place.) > > Hello, > > Previously we implemented an extension of MR3 (an execution engine) to > support Celeborn 0.3.1. For a short introduction, please see: > https://mr3docs.datamonad.com/docs/mr3/features/celeborn/ > > Now we are upgrading Celeborn to 0.5.1 and working on supporting stage > rerun, much like Spark-Celeborn. > > To my (pleasant) surprise, upgrading Celeborn from 0.3.1 to 0.5.1 was > quite smooth. After recompiling with Celeborn 0.5.1, MR3-Celeborn just > worked fine. I was surprised because the current code does not obtain > Celeborn shuffle IDs at all (because there was no notion of Celeborn > shuffle IDs back in 0.3.1) and we use only application shuffle IDs which > are generated by MR3 (similarly to Spark shuffle IDs). > > I have a few questions. > > 1. Suppose that a reducer fails to read the output of a certain mapper. In > such a case, should we re-execute all the mappers in the previous stage? > Or, is it okay to re-execute only the mapper whose output is lost? > In our previous implementation, MR3-Celeborn does not fully support task > rerun (similar to stage rerun) because Celeborn does not return the > identity of mapper tasks whose output has been lost. > > 2. When a reducer tries to read the output of mappers, when is it okay to > use the application shuffle ID? > > 3. Along the same line of question 2, should we always get Celeborn > shuffle IDs when trying to read the output of mappers? Considering the > fact the the current code of MR3-Celeborn works fine, it seems like this > is not always necessary. > > Thank you. > > --- Sungwoo Park > >
