Hi everyone, Thank you all for the thoughtful discussion on this proposal. Given the feedback and the various points raised, *it appears the proposal is not in a position to** move forward at this time*.
I am very interested in hearing any design ideas/alternative directions to address feedback and potentially provide a path forward. *I truly believe there is value in a native(strictly opt-in) solution for shuffle storage disaggregation for improved stability and reduced operational overhead**—especially for users without the resources to maintain an external service.* I appreciate everyone’s time and insights. Thanks & Regards, Karuppayya On Tue, Dec 9, 2025 at 11:42 AM karuppayya <[email protected]> wrote: > Summarizing the discussion: > 1. Concern about redundancy: I understand the concern that this may seem > redundant with other projects. > However, the primary differentiator and benefit of this proposal is > *Minimal* Operational Overhead. > Unlike external shuffle services, this approach does not require > maintaining, patching, or monitoring a separate distributed system/cluster. > This significantly lowers the barrier to entry and reduces the total > cost of ownership (TCO) by eliminating the need for dedicated hardware. > Ultimately, The choice depends on balancing raw performance against > operational simplicity. > 2. Adopting the Incremental Approach Based on the feedback, I propose we > split this initiative into two distinct phases, which also allows us to > deliver value immediately while also helping validate the architecture. > > - Phase 1: Shuffle Consolidation : As noted by many, this could > provide value on its own by optimizing I/O and fixing the "small blocks" > problem. Also leverages AQE for dynamic decision-making. > - Phase 2: Remote Storage Integration: Post Phase 1, we could > introduce the remote storage capability. This would add > - Durability: Resilience against fetch failures. > - Elasticity & Cost Savings: Enable aggressive cluster downscaling > and minimizing re-computation costs when nodes are lost. > > @Chao Sun <[email protected]> - I will rename and update the SPIP > document to reflect the incremental structure, with Phase 1 as the > immediate focus.@Mridul Muralidharan <[email protected]> , @Yang(and > others) > With this revised incremental approach, does this alleviate your concerns > regarding redundancy? > *I would like to gather your inputs before proceeding to a formal vote.* > > - Karuppayya > > On Fri, Dec 5, 2025 at 9:35 PM John Zhuge <[email protected]> wrote: > >> Great feedback from many folks above, especially Wenchen's idea of AQE >> integration. >> >> Many folks here have participated in Spark shuffle discussions over many >> years, why hasn't anybody proposed this before? What was the blind spot? >> For me, I think I have focused too much on the disaggregation of shuffle >> storage from compute and allocating as little local disk as possible on a >> container, without seeing the huge benefit of not having a separate service >> to manage. >> >> >> >> On Fri, Dec 5, 2025 at 6:25 PM Chao Sun <[email protected]> wrote: >> >>> I also feel the shuffle consolidation stage idea itself is interesting >>> and can serve as an improvement on the existing shuffle mechanism in Spark. >>> From that perspective, I'm supportive. On the other hand, the remote >>> storage part seems largely orthogonal to this and can be built separately >>> on top of it. Should we emphasize this more as opposed to the title "Remote >>> Storage as a Shuffle Store"? >>> >>> Best, >>> Chao >>> >>> On Fri, Dec 5, 2025 at 2:15 PM karuppayya <[email protected]> >>> wrote: >>> >>>> Qiegang, >>>> Thanks for the feedback. >>>> >>>> *Chao sent an email with the similar thought as yours (which I got to >>>> know offline).* But somehow it seems to have been lost. >>>> (Also the Apache ponymail >>>> <https://lists.apache.org/thread/d49b3w8s4t3ob6kgyrtxkc7p6qbl3ssd>doesn't >>>> have a few responses (e.g. my last response and Chao's email). Please let >>>> me know who needs to be notified of this ) >>>> >>>> *I completely agree with the suggestion to take an incremental approach >>>> here.* >>>> >>>> *For large/skewed partitions,* >>>> >>>> While memory is bounded in my POC (via the ShuffleBlockFetchIterator >>>> settings), I agree that we need to think through the implications for >>>> local storage, as it presents different challenges than remote >>>> storage, with respect to disk. >>>> >>>> *For the remote storage phase, would it support streaming writes* >>>> >>>> I’ve ensured that writes are streamed >>>> <https://github.com/apache/spark/pull/53028/files#diff-eb253db2e3342a7044aa974c0b9f51ac1c4a5d4e1eb390da8bf1351129f514c4R60> >>>> in the POC PR. Please verify this when you get a chance to look at the >>>> code. >>>> >>>> *Having this native in Spark keeps things simpler* >>>> >>>> I also want to highlight why integrating this directly into Spark is >>>> valuable. >>>> Since many downstream systems(like Gluten/Velox) rely on the Spark >>>> Physical Plan, exposing this information there would help them leverage >>>> this capabilities automatically. This ensures out-of-the-box compatibility >>>> and eliminates the need for ecosystem projects to implement any additional >>>> logic for external systems. >>>> >>>> Mridul, Yang, Wenchen, Chao (and everyone), >>>> I would appreciate your thoughts and feedback on this proposal. Your >>>> inputs are critical. >>>> >>>> Thanks & Regards >>>> Karuppayya >>>> >>>> On Fri, Dec 5, 2025 at 12:23 PM Qiegang Long <[email protected]> wrote: >>>> >>>>> Hi Karuppayya, >>>>> >>>>> Thanks for putting this together - I've been following the discussion >>>>> on this interesting topic. >>>>> >>>>> I really like the simplicity of not needing an external shuffle >>>>> service. Push-based shuffle works, but managing ESS adds operational >>>>> complexity. Having this native in Spark keeps things simpler, and it would >>>>> be an ops win. >>>>> >>>>> Wenchen raised an interesting point about AQE integration being a key >>>>> differentiator. I agree that having Spark dynamically decide whether >>>>> consolidation is worth it based on actual runtime stats could be really >>>>> powerful. >>>>> >>>>> This got me thinking: what if we approach this incrementally? >>>>> >>>>> *Start with local consolidation + AQE* - prove out the consolidation >>>>> stage concept and show it helps with IO efficiency. This is simpler to >>>>> implement and we can measure the impact on real workloads without dealing >>>>> with remote storage complexity yet. >>>>> >>>>> *Then add remote storage *- once we've proven the consolidation stage >>>>> is beneficial, layer in the elasticity benefits of remote storage that >>>>> your >>>>> proposal focuses on. >>>>> >>>>> Wenchen mentioned the consolidation stage can be valuable even without >>>>> remote storage, and that feels right to me. It would also help address >>>>> Mridul and Yang's concerns about overlap with existing projects - if we >>>>> can >>>>> show the AQE-integrated consolidation has unique benefits first, it makes >>>>> the case stronger. >>>>> >>>>> A few implementation questions I'm curious about: >>>>> >>>>> - For large/skewed partitions, could we split consolidation across >>>>> multiple tasks? Like if partition X is 5GB, maybe 5 consolidators each >>>>> handle 1GB worth of mapper outputs. Still way better than fetching from >>>>> thousands of mappers, and it keeps disk/memory bounded per task. >>>>> - For the remote storage phase, would it support streaming writes (S3 >>>>> multipart) to avoid holding huge merged partitions in memory/disk? >>>>> >>>>> Your proposal addresses a real problem - large shuffles with millions >>>>> of small blocks are painful in production, but I think it would be more >>>>> useful if it is integrated with AQE. The incremental approach might be a >>>>> pragmatic way to deliver value sooner. >>>>> >>>>> >>>>> On Wed, Dec 3, 2025 at 3:52 PM karuppayya <[email protected]> >>>>> wrote: >>>>> >>>>>> Wenchen, >>>>>> Thanks for being supportive of the idea. >>>>>> I think I missed addressing some parts of your first email. >>>>>> >>>>>> *we can decrease the number of nodes that host active shuffle data, >>>>>>> so that the cluster is more elastic.* >>>>>> >>>>>> >>>>>> - We can write to local storage since we are using the Hadoop API. >>>>>> - We could optimize task placement to run on hosts containing active >>>>>> shuffle data. >>>>>> - However, this poses a risk of disks filling up sooner( especially >>>>>> with large shuffles), leading to task failures. This issue is amplified >>>>>> in >>>>>> multi-user/session environments (e.g., Spark Connect) where unrelated >>>>>> jobs >>>>>> might fill the disk, causing confusing diagnostic issues. >>>>>> - *I propose offering this as an optional setting/config that >>>>>> knowledgeable users can enable, particularly in environments with >>>>>> sufficient local storage capacity.* >>>>>> >>>>>> *push-based shuffle framework to support other deployments >>>>>>> (standalone, k8s) and remote storage?* >>>>>> >>>>>> >>>>>> - Since the existing framework is YARN-dependent, retrofitting it for >>>>>> other resource managers would require extensive logic for handling >>>>>> resource >>>>>> management and shuffle service deployment differences. >>>>>> - I prefer keeping the consolidation stage separate for a cleaner, >>>>>> more generalized design. >>>>>> *- However, if a significant architectural overlap becomes apparent, >>>>>> then integration should certainly be reconsidered.* >>>>>> >>>>>> >>>>>> Hi Enrico, >>>>>> Thanks for the ideas. >>>>>> I think double writes(either as a direct write or a copy after local >>>>>> write), would >>>>>> - Increases overhead and adds more failure points. >>>>>> - Executors holding onto shuffle data would prevent aggressive >>>>>> cluster downscaling. >>>>>> *While we have slightly lower latency, I think the delta between >>>>>> local and remote reads is small compared to overall job time.* >>>>>> For small jobs, it would be beneficial to bypass the shuffle >>>>>> consolidation stage dynamically at runtime to avoid unnecessary overhead. >>>>>> >>>>>> * Is "shuffle consolidation" the preferred term?* >>>>>> >>>>>> >>>>>> While the existing Spark term is "shuffle merge," which also involves >>>>>> combining shuffle blocks, I am using "shuffle consolidation" mainly for >>>>>> disambiguation. >>>>>> *- Shuffle merge *- combining few shuffle blocks >>>>>> *- Shuffle consolidation* - merge all fragmented shuffle blocks for >>>>>> a given reducer partition >>>>>> *However, I don't have a strong opinion on which term is ultimately >>>>>> used, as long as the function is clearly understood.* >>>>>> >>>>>> Regards >>>>>> Karuppayya >>>>>> >>>>>> >>>>>> On Wed, Dec 3, 2025 at 2:37 AM Enrico Minack <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Karuppayya, >>>>>>> >>>>>>> Thanks for the clarification. >>>>>>> >>>>>>> I would like to emphasize that a solution would be great that allows >>>>>>> to prefer shuffle data from executors over remote storage: >>>>>>> If the shuffle consolidation*) stage merges mapper outputs into >>>>>>> reducer inputs and stores those on a remote storage, it could easily >>>>>>> keep a >>>>>>> copy on the consolidating executors. For the lifetime of these >>>>>>> executors, >>>>>>> reducer executors could preferably fetch the consolidated shuffle data >>>>>>> from >>>>>>> the consolidating executors, with the obvious benefits. Only for >>>>>>> decommissioned consolidation executors or consolidation executors on >>>>>>> failed >>>>>>> nodes, reducer executors fetch consolidated shuffle data from the remote >>>>>>> storage. >>>>>>> >>>>>>> Further, splitting the proposed shuffle consolidation stage into: >>>>>>> 1) consolidate shuffle data to executor local shuffle storage >>>>>>> (AQE-based shuffle consolidation stage as proposed by Wenchen Fan) >>>>>>> 2) replicate local shuffle storage to remote shuffle storage >>>>>>> feels like a natural separation of concerns. And it allows for >>>>>>> custom configuration to employ only the former without the latter or >>>>>>> vice >>>>>>> versa. >>>>>>> >>>>>>> Speaking of the ability to read shuffle data from executors during >>>>>>> their existence while falling back to the remote storage replica >>>>>>> reminds of >>>>>>> the existing Fallback Storage logic. Feature 2) could be implemented by >>>>>>> evolving existing infrastructure (Fallback Storage) with only hundred >>>>>>> lines >>>>>>> of code. The read-from-executors-first-then-remote-storage feature would >>>>>>> cost a few more hundred lines of code. >>>>>>> >>>>>>> Cheers, >>>>>>> Enrico >>>>>>> >>>>>>> >>>>>>> *) Is "shuffle consolidation" the preferred term? Isn't the >>>>>>> existing Spark term "shuffle merge" exactly what is being described >>>>>>> here, >>>>>>> maybe with some small extension? >>>>>>> >>>>>>> >>>>>>> Am 01.12.25 um 20:30 schrieb karuppayya: >>>>>>> >>>>>>> Hi everyone, >>>>>>> Thank you all for your valuable comments and discussion on the >>>>>>> design document/this email. I have replied to the comments/concerns >>>>>>> raised. >>>>>>> I welcome any other questions and to be challenged further. >>>>>>> >>>>>>> Also *Sun Chao* accepted to shepherd this proposal(Thank you!) >>>>>>> >>>>>>> If there are no other open questions by Wednesday morning (PST), I >>>>>>> will request Chao to open the official voting thread (which should give >>>>>>> 72 hours for the process >>>>>>> <https://spark.apache.org/improvement-proposals.html>). >>>>>>> >>>>>>> - Karuppayya >>>>>>> >>>>>>> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> One aspect that hasn’t been mentioned yet (or so I think) is the >>>>>>>> thread-level behavior of shuffle. In large workloads with many small >>>>>>>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of >>>>>>>> threads tied to shuffle fetch operations, Netty client handlers, and >>>>>>>> block >>>>>>>> file access. >>>>>>>> Since the proposal changes block granularity and fetch patterns, it >>>>>>>> would be valuable to explicitly consider how the consolidation stage >>>>>>>> affects: >>>>>>>> – the number of concurrent fetch operations >>>>>>>> – thread pool growth / saturation >>>>>>>> – Netty transport threads >>>>>>>> – memory pressure from large in-flight reads >>>>>>>> >>>>>>>> Btw, I find your proposal quite interesting. >>>>>>>> >>>>>>>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]> >>>>>>>> escribió: >>>>>>>> >>>>>>>>> Rishab, Wenchen, Murali, >>>>>>>>> Thank you very much for taking the time to review the proposal and >>>>>>>>> for providing such thoughtful and insightful comments/questions. >>>>>>>>> >>>>>>>>> *Rishab*, >>>>>>>>> >>>>>>>>>> * suitable across all storage systems* >>>>>>>>> >>>>>>>>> You are right that the suitability is somewhat subjective and >>>>>>>>> dependent on cloud provider used. >>>>>>>>> In general, the goal of ShuffleVault is to utilize the standard >>>>>>>>> Hadoop FileSystem APIs, which means it should work seamlessly with >>>>>>>>> popular >>>>>>>>> cloud and distributed file systems (like S3, HDFS, GFS, etc.). >>>>>>>>> These systems share a similar nature and are designed for large >>>>>>>>> files. >>>>>>>>> >>>>>>>>> *large file could create problems during retrieval.* >>>>>>>>> >>>>>>>>> We are mitigating this risk by ensuring that tasks do not read >>>>>>>>> the entire consolidated file at once. >>>>>>>>> Instead, the implementation is designed to read the data in >>>>>>>>> configured blocks, rather than relying on a single read. *This >>>>>>>>> behavior can be refined/validated* to make it more robust. >>>>>>>>> >>>>>>>>> *Wenchen,* >>>>>>>>> I fully agree that the operational details around using cloud >>>>>>>>> storage for shuffle—specifically traffic throttling, cleanup >>>>>>>>> guarantees >>>>>>>>> and overall request-related network cost — these are critical >>>>>>>>> issues that must be solved. >>>>>>>>> The consolidation stage is explicitly designed to mitigate the >>>>>>>>> throttling and accompanying cost issues . >>>>>>>>> *Throttling* - By consolidating shuffle data, this approach >>>>>>>>> transforms the read pattern from a multitude of small, random >>>>>>>>> requests into >>>>>>>>> fewer, large, targeted ones. Particularly beneficial for modern cloud >>>>>>>>> object storage. >>>>>>>>> *Shuffle cleanup* - I am actively trying to leverage the >>>>>>>>> Shufffle clean up mode and also making an effort to make them robust >>>>>>>>> .These >>>>>>>>> cleanup improvements should be beneficial, regardless of this >>>>>>>>> proposal and >>>>>>>>> cover most cases. >>>>>>>>> However, I agree that to ensure *no orphaned files* remain, we >>>>>>>>> will still require other means (such as remote storage lifecycle >>>>>>>>> policies >>>>>>>>> or job-specific scripts) for a guaranteed cleanup. >>>>>>>>> Thank you again for your valuable feedback, especially the >>>>>>>>> validation on synchronous scheduling and AQE integration. >>>>>>>>> >>>>>>>>> *Murali,* >>>>>>>>> >>>>>>>>>> * Doing an explicit sort stage* >>>>>>>>> >>>>>>>>> To clarify, ShuffleVault does not introduce an explicit sort >>>>>>>>> stage. Instead, it introduces a Shuffle Consolidation Stage. >>>>>>>>> This stage is a pure passthrough operation that only aggregates >>>>>>>>> scattered shuffle data for a given reducer partition. >>>>>>>>> In simple terms, it functions as an additional reducer stage that >>>>>>>>> reads the fragmented shuffle files from the mappers and writes them >>>>>>>>> as a >>>>>>>>> single, consolidated, durable file to remote storage. >>>>>>>>> >>>>>>>>> *but that would be a nontrivial change * >>>>>>>>> >>>>>>>>> I agree that the change is significant, I am actively working to >>>>>>>>> ensure the benefits are leveraged across the stack. This PR >>>>>>>>> <https://github.com/apache/spark/pull/53028> demonstrates >>>>>>>>> integration with AQE and interactions with other rules(Exchange reuse, >>>>>>>>> Shuffle Partition Coalescing ect). >>>>>>>>> I would genuinely appreciate it if you could take a look at the >>>>>>>>> POC PR to see the scope00 of changes. The primary logic is >>>>>>>>> encapsulated >>>>>>>>> within a new Spark Physical Planner Rule >>>>>>>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6> >>>>>>>>> that injects the consolidation stage, which is the main crux. >>>>>>>>> >>>>>>>>> I welcome any further questions or comments! >>>>>>>>> >>>>>>>>> Thanks & Regards >>>>>>>>> Karuppayya >>>>>>>>> >>>>>>>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> There are existing Apache projects which provide the capabilities >>>>>>>>>> which largely addresses the problem statement - Apache Celeborn, >>>>>>>>>> Apache >>>>>>>>>> Uniffle, Zeus, etc. >>>>>>>>>> Doing an explicit sort stage, between "map" and "reduce" brings >>>>>>>>>> with it some nice advantages, especially if the output is durable, >>>>>>>>>> but that >>>>>>>>>> would be a nontrivial change - and should be attempted if the >>>>>>>>>> benefits are >>>>>>>>>> being leveraged throughout the stack (AQE, speculative execution, >>>>>>>>>> etc) >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Mridul >>>>>>>>>> >>>>>>>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Karuppayya, >>>>>>>>>>> >>>>>>>>>>> Handling large shuffles in Spark is challenging and it's great >>>>>>>>>>> to see proposals addressing it. I think the extra "shuffle >>>>>>>>>>> consolidation >>>>>>>>>>> stage" is a good idea, and now I feel it's better for it to be >>>>>>>>>>> synchronous, >>>>>>>>>>> so that we can integrate it with AQE and leverage the accurate >>>>>>>>>>> runtime >>>>>>>>>>> shuffle status to make decisions about whether or not to launch >>>>>>>>>>> this extra >>>>>>>>>>> "shuffle consolidation stage" and how to consolidate. This is a key >>>>>>>>>>> differentiator compared to the push-based shuffle. >>>>>>>>>>> >>>>>>>>>>> However, there are many details to consider, and in general it's >>>>>>>>>>> difficult to use cloud storage for shuffle. We need to deal with >>>>>>>>>>> problems >>>>>>>>>>> like traffic throttling, cleanup guarantee, cost control, and so >>>>>>>>>>> on. Let's >>>>>>>>>>> take a step back and see what are the actual problems of large >>>>>>>>>>> shuffles. >>>>>>>>>>> >>>>>>>>>>> Large shuffle usually starts with a large number of mappers that >>>>>>>>>>> we can't adjust (e.g. large table scan). We can adjust the number of >>>>>>>>>>> reducers to reach two goals: >>>>>>>>>>> 1. The input data size of each reducer shouldn't be too large, >>>>>>>>>>> which is roughly *total_shuffle_size / num_reducers*. This is >>>>>>>>>>> to avoid spilling/OOM during reducer task execution. >>>>>>>>>>> 2. The data size of each shuffle block shoudn't be too small, >>>>>>>>>>> which is roughly *total_shuffle_size / (num_mappers * >>>>>>>>>>> num_reducers)*. This is for the good of disk/network IO. >>>>>>>>>>> >>>>>>>>>>> These two goals are actually contradictory and sometimes we have >>>>>>>>>>> to prioritize goal 1 (i.e. pick a large *num_reducers*) so that >>>>>>>>>>> the query can finish. An extra "shuffle consolidation stage" can >>>>>>>>>>> kind of >>>>>>>>>>> decrease the number of mappers, by merging the shuffle files from >>>>>>>>>>> multiple >>>>>>>>>>> mappers. This can be a clear win as fetching many small shuffle >>>>>>>>>>> blocks can >>>>>>>>>>> be quite slow, even slower than running an extra "shuffle >>>>>>>>>>> consolidation >>>>>>>>>>> stage". >>>>>>>>>>> >>>>>>>>>>> In addition, the nodes that host shuffle files shouldn't be too >>>>>>>>>>> many (best to be 0 which means shuffle files are stored in a >>>>>>>>>>> different >>>>>>>>>>> storage). With a large number of mappers, likely every node in the >>>>>>>>>>> cluster >>>>>>>>>>> stores some shuffle files. By merging shuffle files via the extra >>>>>>>>>>> "shuffle >>>>>>>>>>> consolidation stage", we can decrease the number of nodes that host >>>>>>>>>>> active >>>>>>>>>>> shuffle data, so that the cluster is more elastic. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Wenchen >>>>>>>>>>> >>>>>>>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Karuppayya, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for sharing the proposal and this looks very exciting. >>>>>>>>>>>> >>>>>>>>>>>> I have a few questions and please correct me if I misunderstood >>>>>>>>>>>> anything. >>>>>>>>>>>> >>>>>>>>>>>> Would it be possible to clarify whether the consolidated >>>>>>>>>>>> shuffle file produced for each partition is suitable across all >>>>>>>>>>>> storage >>>>>>>>>>>> systems, especially when this file becomes extremely large? I am >>>>>>>>>>>> wondering >>>>>>>>>>>> if a very large file could create problems during retrieval. For >>>>>>>>>>>> example, >>>>>>>>>>>> if a connection breaks while reading the file, some storage >>>>>>>>>>>> systems may not >>>>>>>>>>>> support resuming reads from the point of failure and start reading >>>>>>>>>>>> the file >>>>>>>>>>>> from the beginning again. This could lead to higher latency, >>>>>>>>>>>> repeated >>>>>>>>>>>> retries, or performance bottlenecks when a partition becomes too >>>>>>>>>>>> large or >>>>>>>>>>>> skewed? >>>>>>>>>>>> >>>>>>>>>>>> Would it make sense to introduce a configurable upper-bound on >>>>>>>>>>>> the maximum allowed file size? This might prevent the file from >>>>>>>>>>>> growing >>>>>>>>>>>> massively. >>>>>>>>>>>> Should the consolidated shuffle file be compressed before being >>>>>>>>>>>> written to the storage system. Compression might introduce >>>>>>>>>>>> additional >>>>>>>>>>>> latency but that too can be a configurable option. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Rishab Joshi >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Enrico, >>>>>>>>>>>>> Thank you very much for reviewing the doc. >>>>>>>>>>>>> >>>>>>>>>>>>> *Since the consolidation stage reads all the shuffle data, why >>>>>>>>>>>>>> not doing the transformation in that stage? What is the point in >>>>>>>>>>>>>> deferring >>>>>>>>>>>>>> the transformations into another stage?* >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> The reason for deferring the final consolidation to a >>>>>>>>>>>>> subsequent stage lies in the distributed nature of shuffle data. >>>>>>>>>>>>> Reducer requires reading all corresponding shuffle data >>>>>>>>>>>>> written across all map tasks. Since each mapper only holds its >>>>>>>>>>>>> own local >>>>>>>>>>>>> output, the consolidation cannot begin until all the map stage >>>>>>>>>>>>> completes. >>>>>>>>>>>>> >>>>>>>>>>>>> However, your question is also aligned to one of the >>>>>>>>>>>>> approaches mentioned (concurrent consolidation >>>>>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>), >>>>>>>>>>>>> which was specifically considered. >>>>>>>>>>>>> While the synchronous consolidation happens afetr all the data >>>>>>>>>>>>> is available , concurrent consolidation can aggregate and persist >>>>>>>>>>>>> the >>>>>>>>>>>>> already-generated shuffle data to begin concurrently with the >>>>>>>>>>>>> remaining map >>>>>>>>>>>>> tasks, thereby making the shuffle durable much earlier instead of >>>>>>>>>>>>> having to >>>>>>>>>>>>> wait for all map tasks to complete. >>>>>>>>>>>>> >>>>>>>>>>>>> - Karuppayya >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> another remark regarding a remote shuffle storage solution: >>>>>>>>>>>>>> As long as the map executors are alive, reduce executors >>>>>>>>>>>>>> should read from them to avoid any extra delay / overhead. >>>>>>>>>>>>>> On fetch failure from a map executor, the reduce executors >>>>>>>>>>>>>> should fall back to a remote storage that provides a copy >>>>>>>>>>>>>> (merged or not) >>>>>>>>>>>>>> of the shuffle data. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Enrico >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Karuppayya, >>>>>>>>>>>>>> >>>>>>>>>>>>>> thanks for your proposal and bringing up this issue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am very much in favour of a shuffle storage solution that >>>>>>>>>>>>>> allows for dynamic allocation and node failure in a K8S >>>>>>>>>>>>>> environment, >>>>>>>>>>>>>> without the burden of managing an Remote Shuffle Service. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have the following comments: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Your proposed consolidation stage is equivalent to the next >>>>>>>>>>>>>> reducer stage in the sense that it reads shuffle data from the >>>>>>>>>>>>>> earlier map >>>>>>>>>>>>>> stage. This requires the executors of the map stage to survive >>>>>>>>>>>>>> until the >>>>>>>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). >>>>>>>>>>>>>> Therefore, I >>>>>>>>>>>>>> think this passage of your design document is not accurate: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Executors that perform the initial map tasks (shuffle >>>>>>>>>>>>>> writers) can be immediately deallocated after writing their >>>>>>>>>>>>>> shuffle data ... >>>>>>>>>>>>>> >>>>>>>>>>>>>> Since the consolidation stage reads all the shuffle data, why >>>>>>>>>>>>>> not doing the transformation in that stage? What is the point in >>>>>>>>>>>>>> deferring >>>>>>>>>>>>>> the transformations into another stage? >>>>>>>>>>>>>> >>>>>>>>>>>>>> You mention the "Native Shuffle Block Migration" and say its >>>>>>>>>>>>>> limitation is "It simply shifts the storage burden to other >>>>>>>>>>>>>> active >>>>>>>>>>>>>> executors". >>>>>>>>>>>>>> Please consider that the migration process can migrate to a >>>>>>>>>>>>>> (in Spark called) fallback storage, which essentially copies the >>>>>>>>>>>>>> shuffle >>>>>>>>>>>>>> data to a remote storage. >>>>>>>>>>>>>> Kind regards, >>>>>>>>>>>>>> Enrico >>>>>>>>>>>>>> >>>>>>>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I propose to utilize *Remote Storage as a Shuffle >>>>>>>>>>>>>> Store, natively in Spark* . >>>>>>>>>>>>>> >>>>>>>>>>>>>> This approach would fundamentally decouple shuffle storage >>>>>>>>>>>>>> from compute nodes, mitigating *shuffle fetch failures and >>>>>>>>>>>>>> also help with aggressive downscaling*. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The primary goal is to enhance the *elasticity and >>>>>>>>>>>>>> resilience* of Spark workloads, leading to substantial cost >>>>>>>>>>>>>> optimization opportunities. >>>>>>>>>>>>>> >>>>>>>>>>>>>> *I welcome any initial thoughts or concerns regarding this >>>>>>>>>>>>>> idea.* >>>>>>>>>>>>>> *Looking forward to your feedback! * >>>>>>>>>>>>>> >>>>>>>>>>>>>> JIRA: SPARK-53484 >>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327> >>>>>>>>>>>>>> SPIP doc >>>>>>>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw> >>>>>>>>>>>>>> , >>>>>>>>>>>>>> Design doc >>>>>>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0> >>>>>>>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Karuppayya >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Regards >>>>>>>>>>>> Rishab Joshi >>>>>>>>>>>> >>>>>>>>>>> >>>>>>> >> >> -- >> John Zhuge >> >
