Enrico, Thank you very much for reviewing the doc. *Since the consolidation stage reads all the shuffle data, why not doing > the transformation in that stage? What is the point in deferring the > transformations into another stage?*
The reason for deferring the final consolidation to a subsequent stage lies in the distributed nature of shuffle data. Reducer requires reading all corresponding shuffle data written across all map tasks. Since each mapper only holds its own local output, the consolidation cannot begin until all the map stage completes. However, your question is also aligned to one of the approaches mentioned (concurrent consolidation <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>), which was specifically considered. While the synchronous consolidation happens afetr all the data is available , concurrent consolidation can aggregate and persist the already-generated shuffle data to begin concurrently with the remaining map tasks, thereby making the shuffle durable much earlier instead of having to wait for all map tasks to complete. - Karuppayya On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <[email protected]> wrote: > Hi, > > another remark regarding a remote shuffle storage solution: > As long as the map executors are alive, reduce executors should read from > them to avoid any extra delay / overhead. > On fetch failure from a map executor, the reduce executors should fall > back to a remote storage that provides a copy (merged or not) of the > shuffle data. > > Cheers, > Enrico > > > Am 13.11.25 um 09:42 schrieb Enrico Minack: > > Hi Karuppayya, > > thanks for your proposal and bringing up this issue. > > I am very much in favour of a shuffle storage solution that allows for > dynamic allocation and node failure in a K8S environment, without the > burden of managing an Remote Shuffle Service. > > I have the following comments: > > Your proposed consolidation stage is equivalent to the next reducer stage > in the sense that it reads shuffle data from the earlier map stage. This > requires the executors of the map stage to survive until the shuffle data > are consolidated ("merged" in Spark terminology). Therefore, I think this > passage of your design document is not accurate: > > Executors that perform the initial map tasks (shuffle writers) can be > immediately deallocated after writing their shuffle data ... > > Since the consolidation stage reads all the shuffle data, why not doing > the transformation in that stage? What is the point in deferring the > transformations into another stage? > > You mention the "Native Shuffle Block Migration" and say its limitation is > "It simply shifts the storage burden to other active executors". > Please consider that the migration process can migrate to a (in Spark > called) fallback storage, which essentially copies the shuffle data to a > remote storage. > Kind regards, > Enrico > > Am 13.11.25 um 01:40 schrieb karuppayya: > > Hi All, > > I propose to utilize *Remote Storage as a Shuffle Store, natively in > Spark* . > > This approach would fundamentally decouple shuffle storage from compute > nodes, mitigating *shuffle fetch failures and also help with > aggressive downscaling*. > > The primary goal is to enhance the *elasticity and resilience* of Spark > workloads, leading to substantial cost optimization opportunities. > > *I welcome any initial thoughts or concerns regarding this idea.* > *Looking forward to your feedback! * > > JIRA: SPARK-53484 <https://issues.apache.org/jira/browse/SPARK-54327> > SPIP doc > <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw> > , > Design doc > <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0> > PoC PR <https://github.com/apache/spark/pull/53028> > > Thanks, > Karuppayya > > > >
