A reducer oriented view of shuffle, especially without replication, could indeed be susceptible to this issue you described (a single fetch failure would require all mappers to need to be recomputed) - note, not necessarily all reducers to be recomputed though.
Note that I have not looked much into Celeborn specifically on this aspect yet, so my comments are failure centric to Spark internals :-) Regards, Mridul On Sat, Oct 14, 2023 at 3:36 AM Sungwoo Park <glap...@gmail.com> wrote: > Hello, > > (Sorry for sending the same message again.) > > From my understanding, the current implementation of Celeborn makes it > hard to find out which mapper should be re-executed when a partition cannot > be read, and we should re-execute all the mappers in the upstream stage. If > we can find out which mapper/partition should be re-executed, the current > logic of stage recomputation could be (partially or totally) reused. > > Regards, > > --- Sungwoo > > On Sat, Oct 14, 2023 at 5:24 PM Mridul Muralidharan <mri...@gmail.com> > wrote: > >> >> Hi, >> >> Spark will try to minimize the recomputation cost as much as possible. >> For example, if parent stage was DETERMINATE, it simply needs to >> recompute the missing (mapper) partitions (which resulted in fetch >> failure). Note, this by itself could require further recomputation in the >> DAG if the inputs required to comput the parent partitions are missing, and >> so on - so it is dynamic. >> >> Regards, >> Mridul >> >> On Sat, Oct 14, 2023 at 2:30 AM Sungwoo Park <o...@pl.postech.ac.kr> >> wrote: >> >>> > a) If one or more tasks for a stage (and so its shuffle id) is going >>> to be >>> > recomputed, if it is an INDETERMINATE stage, all shuffle output will be >>> > discarded and it will be entirely recomputed (see here >>> > < >>> https://github.com/apache/spark/blob/3e2470de7ea8b97dcdd8875ef25f044998fb7588/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1477 >>> > >>> > ). >>> >>> If a reducer (in a downstream stage) fails to read data, can we find out >>> which tasks should recompute their output? From the previous discussion, >>> I >>> thought this was hard (in the current implementation), and we should >>> re-execute all tasks in the upstream stage. >>> >>> Thanks, >>> >>> --- Sungwoo >>> >>