Hi everyone,

Thank you all for the thoughtful discussion on this proposal.
Given the feedback and the various points raised, *it appears the proposal
is not in a position to** move forward at this time*.

I am very interested in hearing any design ideas/alternative directions to
address feedback and potentially provide a path forward.

*I truly believe there is value in a native(strictly opt-in) solution
for shuffle storage disaggregation for improved stability and reduced
operational overhead**—especially for users without the resources to
maintain an external service.*

I appreciate everyone’s time and insights.

Thanks & Regards,
Karuppayya

On Tue, Dec 9, 2025 at 11:42 AM karuppayya <[email protected]> wrote:

> Summarizing the discussion:
> 1. Concern about redundancy: I understand the concern that this may seem
> redundant with other projects.
>     However, the primary differentiator and benefit of this proposal is
> *Minimal* Operational Overhead.
>     Unlike external shuffle services, this approach does not require
> maintaining, patching, or monitoring a separate distributed system/cluster.
>     This significantly lowers the barrier to entry and reduces the total
> cost of ownership (TCO) by eliminating the need for dedicated hardware.
>     Ultimately, The choice depends on balancing raw performance against
> operational simplicity.
> 2. Adopting the Incremental Approach Based on the feedback, I propose we
> split this initiative into two distinct phases, which also allows us to
> deliver value immediately while also helping validate the architecture.
>
>    - Phase 1: Shuffle Consolidation :  As noted by many, this could
>    provide  value on its own by optimizing I/O and fixing the "small blocks"
>    problem. Also leverages AQE for dynamic decision-making.
>    - Phase 2: Remote Storage Integration: Post Phase 1, we could
>    introduce the remote storage capability. This would add
>       - Durability: Resilience against fetch failures.
>       - Elasticity & Cost Savings: Enable aggressive cluster downscaling
>       and minimizing re-computation costs when nodes are lost.
>
> @Chao Sun <[email protected]>  - I will rename and update the SPIP
> document to reflect the incremental structure, with Phase 1 as the
> immediate focus.@Mridul Muralidharan <[email protected]> , @Yang(and
> others)
> With this revised incremental approach, does this alleviate your concerns
> regarding redundancy?
> *I would like to gather your inputs before proceeding to a formal vote.*
>
> - Karuppayya
>
> On Fri, Dec 5, 2025 at 9:35 PM John Zhuge <[email protected]> wrote:
>
>> Great feedback from many folks above, especially Wenchen's idea of AQE
>> integration.
>>
>> Many folks here have participated in Spark shuffle discussions over many
>> years, why hasn't anybody proposed this before? What was the blind spot?
>> For me, I think I have focused too much on the disaggregation of shuffle
>> storage from compute and allocating as little local disk as possible on a
>> container, without seeing the huge benefit of not having a separate service
>> to manage.
>>
>>
>>
>> On Fri, Dec 5, 2025 at 6:25 PM Chao Sun <[email protected]> wrote:
>>
>>> I also feel the shuffle consolidation stage idea itself is interesting
>>> and can serve as an improvement on the existing shuffle mechanism in Spark.
>>> From that perspective, I'm supportive. On the other hand, the remote
>>> storage part seems largely orthogonal to this and can be built separately
>>> on top of it. Should we emphasize this more as opposed to the title "Remote
>>> Storage as a Shuffle Store"?
>>>
>>> Best,
>>> Chao
>>>
>>> On Fri, Dec 5, 2025 at 2:15 PM karuppayya <[email protected]>
>>> wrote:
>>>
>>>> Qiegang,
>>>> Thanks for the feedback.
>>>>
>>>> *Chao sent an email with the similar thought as yours (which I got to
>>>> know offline).* But somehow it seems to have been lost.
>>>> (Also the Apache ponymail
>>>> <https://lists.apache.org/thread/d49b3w8s4t3ob6kgyrtxkc7p6qbl3ssd>doesn't
>>>> have a few responses (e.g. my last response and Chao's email). Please let
>>>> me know who needs to be notified of this )
>>>>
>>>> *I completely agree with the suggestion to take an incremental approach
>>>> here.*
>>>>
>>>> *For large/skewed partitions,*
>>>>
>>>> While memory is bounded in my POC (via the ShuffleBlockFetchIterator
>>>> settings),  I agree that we need to think through the implications for
>>>> local storage, as it presents different challenges than remote
>>>> storage, with respect to disk.
>>>>
>>>> *For the remote storage phase, would it support streaming writes*
>>>>
>>>> I’ve ensured that writes are streamed
>>>> <https://github.com/apache/spark/pull/53028/files#diff-eb253db2e3342a7044aa974c0b9f51ac1c4a5d4e1eb390da8bf1351129f514c4R60>
>>>> in the POC PR. Please verify this when you get a chance to look at the 
>>>> code.
>>>>
>>>> *Having this native in Spark keeps things simpler*
>>>>
>>>> I also want to highlight why integrating this directly into Spark is
>>>> valuable.
>>>> Since many downstream systems(like Gluten/Velox) rely on the Spark
>>>> Physical Plan, exposing this information there would help them leverage
>>>> this capabilities automatically. This ensures out-of-the-box compatibility
>>>> and eliminates the need for ecosystem projects to implement any additional
>>>> logic for external systems.
>>>>
>>>> Mridul, Yang, Wenchen, Chao (and everyone),
>>>> I would appreciate your thoughts and feedback on this proposal. Your
>>>> inputs are critical.
>>>>
>>>> Thanks & Regards
>>>> Karuppayya
>>>>
>>>> On Fri, Dec 5, 2025 at 12:23 PM Qiegang Long <[email protected]> wrote:
>>>>
>>>>> Hi Karuppayya,
>>>>>
>>>>> Thanks for putting this together - I've been following the discussion
>>>>> on this interesting topic.
>>>>>
>>>>> I really like the simplicity of not needing an external shuffle
>>>>> service. Push-based shuffle works, but managing ESS adds operational
>>>>> complexity. Having this native in Spark keeps things simpler, and it would
>>>>> be an ops win.
>>>>>
>>>>> Wenchen raised an interesting point about AQE integration being a key
>>>>> differentiator. I agree that having Spark dynamically decide whether
>>>>> consolidation is worth it based on actual runtime stats could be really
>>>>> powerful.
>>>>>
>>>>> This got me thinking: what if we approach this incrementally?
>>>>>
>>>>> *Start with local consolidation + AQE* - prove out the consolidation
>>>>> stage concept and show it helps with IO efficiency. This is simpler to
>>>>> implement and we can measure the impact on real workloads without dealing
>>>>> with remote storage complexity yet.
>>>>>
>>>>> *Then add remote storage *- once we've proven the consolidation stage
>>>>> is beneficial, layer in the elasticity benefits of remote storage that 
>>>>> your
>>>>> proposal focuses on.
>>>>>
>>>>> Wenchen mentioned the consolidation stage can be valuable even without
>>>>> remote storage, and that feels right to me. It would also help address
>>>>> Mridul and Yang's concerns about overlap with existing projects - if we 
>>>>> can
>>>>> show the AQE-integrated consolidation has unique benefits first, it makes
>>>>> the case stronger.
>>>>>
>>>>> A few implementation questions I'm curious about:
>>>>>
>>>>> - For large/skewed partitions, could we split consolidation across
>>>>> multiple tasks? Like if partition X is 5GB, maybe 5 consolidators each
>>>>> handle 1GB worth of mapper outputs. Still way better than fetching from
>>>>> thousands of mappers, and it keeps disk/memory bounded per task.
>>>>> - For the remote storage phase, would it support streaming writes (S3
>>>>> multipart) to avoid holding huge merged partitions in memory/disk?
>>>>>
>>>>> Your proposal addresses a real problem - large shuffles with millions
>>>>> of small blocks are painful in production, but I think it would be more
>>>>> useful if it is integrated with AQE. The incremental approach might be a
>>>>> pragmatic way to deliver value sooner.
>>>>>
>>>>>
>>>>> On Wed, Dec 3, 2025 at 3:52 PM karuppayya <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Wenchen,
>>>>>> Thanks for being supportive of the idea.
>>>>>> I think I missed addressing some parts of your first email.
>>>>>>
>>>>>> *we can decrease the number of nodes that host active shuffle data,
>>>>>>> so that the cluster is more elastic.*
>>>>>>
>>>>>>
>>>>>> - We can write to local storage since we are using the Hadoop API.
>>>>>> - We could optimize task placement to run on hosts containing active
>>>>>> shuffle data.
>>>>>> - However, this poses a risk of disks filling up sooner( especially
>>>>>> with large shuffles), leading to task failures. This issue is amplified 
>>>>>> in
>>>>>> multi-user/session environments (e.g., Spark Connect) where unrelated 
>>>>>> jobs
>>>>>> might fill the disk, causing confusing diagnostic issues.
>>>>>> - *I propose offering this as an optional setting/config that
>>>>>> knowledgeable users can enable, particularly in environments with
>>>>>> sufficient local storage capacity.*
>>>>>>
>>>>>> *push-based shuffle framework to support other deployments
>>>>>>> (standalone, k8s) and remote storage?*
>>>>>>
>>>>>>
>>>>>> - Since the existing framework is YARN-dependent, retrofitting it for
>>>>>> other resource managers would require extensive logic for handling 
>>>>>> resource
>>>>>> management and shuffle service deployment differences.
>>>>>> - I prefer keeping the consolidation stage separate for a cleaner,
>>>>>> more generalized design.
>>>>>> *- However, if a significant architectural overlap becomes apparent,
>>>>>> then integration should certainly be reconsidered.*
>>>>>>
>>>>>>
>>>>>> Hi Enrico,
>>>>>> Thanks for the ideas.
>>>>>> I think double writes(either as a direct write or a copy after local
>>>>>> write), would
>>>>>> -  Increases overhead and adds more failure points.
>>>>>> - Executors holding onto shuffle data would prevent aggressive
>>>>>> cluster downscaling.
>>>>>> *While we have slightly lower latency, I think the delta between
>>>>>> local and remote reads is small compared to overall job time.*
>>>>>> For small jobs, it would be beneficial to bypass the shuffle
>>>>>> consolidation stage dynamically at runtime to avoid unnecessary overhead.
>>>>>>
>>>>>> * Is "shuffle consolidation" the preferred term?*
>>>>>>
>>>>>>
>>>>>> While the existing Spark term is "shuffle merge," which also involves
>>>>>> combining shuffle blocks, I am using "shuffle consolidation"  mainly for
>>>>>> disambiguation.
>>>>>> *- Shuffle merge *- combining few shuffle blocks
>>>>>> *- Shuffle consolidation* - merge all fragmented shuffle blocks for
>>>>>> a given reducer partition
>>>>>> *However, I don't have a strong opinion on which term is ultimately
>>>>>> used, as long as the function is clearly understood.*
>>>>>>
>>>>>> Regards
>>>>>> Karuppayya
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 3, 2025 at 2:37 AM Enrico Minack <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Karuppayya,
>>>>>>>
>>>>>>> Thanks for the clarification.
>>>>>>>
>>>>>>> I would like to emphasize that a solution would be great that allows
>>>>>>> to prefer shuffle data from executors over remote storage:
>>>>>>> If the shuffle consolidation*) stage merges mapper outputs into
>>>>>>> reducer inputs and stores those on a remote storage, it could easily 
>>>>>>> keep a
>>>>>>> copy on the consolidating executors. For the lifetime of these 
>>>>>>> executors,
>>>>>>> reducer executors could preferably fetch the consolidated shuffle data 
>>>>>>> from
>>>>>>> the consolidating executors, with the obvious benefits. Only for
>>>>>>> decommissioned consolidation executors or consolidation executors on 
>>>>>>> failed
>>>>>>> nodes, reducer executors fetch consolidated shuffle data from the remote
>>>>>>> storage.
>>>>>>>
>>>>>>> Further, splitting the proposed shuffle consolidation stage into:
>>>>>>> 1) consolidate shuffle data to executor local shuffle storage
>>>>>>> (AQE-based shuffle consolidation stage as proposed by Wenchen Fan)
>>>>>>> 2) replicate local shuffle storage to remote shuffle storage
>>>>>>> feels like a natural separation of concerns. And it allows for
>>>>>>> custom configuration to employ only the former without the latter or 
>>>>>>> vice
>>>>>>> versa.
>>>>>>>
>>>>>>> Speaking of the ability to read shuffle data from executors during
>>>>>>> their existence while falling back to the remote storage replica 
>>>>>>> reminds of
>>>>>>> the existing Fallback Storage logic. Feature 2) could be implemented by
>>>>>>> evolving existing infrastructure (Fallback Storage) with only hundred 
>>>>>>> lines
>>>>>>> of code. The read-from-executors-first-then-remote-storage feature would
>>>>>>> cost a few more hundred lines of code.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Enrico
>>>>>>>
>>>>>>>
>>>>>>> *) Is "shuffle consolidation" the preferred term? Isn't the
>>>>>>> existing Spark term "shuffle merge" exactly what is being described 
>>>>>>> here,
>>>>>>> maybe with some small extension?
>>>>>>>
>>>>>>>
>>>>>>> Am 01.12.25 um 20:30 schrieb karuppayya:
>>>>>>>
>>>>>>> Hi everyone,
>>>>>>> Thank you all for your valuable comments and discussion on the
>>>>>>> design document/this email. I have replied to the comments/concerns 
>>>>>>> raised.
>>>>>>> I welcome any other questions and to be challenged further.
>>>>>>>
>>>>>>> Also *Sun Chao* accepted to shepherd this proposal(Thank you!)
>>>>>>>
>>>>>>> If there are no other open questions by Wednesday morning (PST), I
>>>>>>> will request Chao to open the official voting thread (which should give
>>>>>>> 72 hours for the process
>>>>>>> <https://spark.apache.org/improvement-proposals.html>).
>>>>>>>
>>>>>>> - Karuppayya
>>>>>>>
>>>>>>> On Tue, Nov 18, 2025 at 12:38 PM Ángel Álvarez Pascua <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> One aspect that hasn’t been mentioned yet (or so I think) is the
>>>>>>>> thread-level behavior of shuffle. In large workloads with many small
>>>>>>>> shuffle blocks, I’ve repeatedly observed executors spawning hundreds of
>>>>>>>> threads tied to shuffle fetch operations, Netty client handlers, and 
>>>>>>>> block
>>>>>>>> file access.
>>>>>>>> Since the proposal changes block granularity and fetch patterns, it
>>>>>>>> would be valuable to explicitly consider how the consolidation stage
>>>>>>>> affects:
>>>>>>>> – the number of concurrent fetch operations
>>>>>>>> – thread pool growth / saturation
>>>>>>>> – Netty transport threads
>>>>>>>> – memory pressure from large in-flight reads
>>>>>>>>
>>>>>>>> Btw, I find your proposal quite interesting.
>>>>>>>>
>>>>>>>> El mar, 18 nov 2025, 19:33, karuppayya <[email protected]>
>>>>>>>> escribió:
>>>>>>>>
>>>>>>>>> Rishab, Wenchen, Murali,
>>>>>>>>> Thank you very much for taking the time to review the proposal and
>>>>>>>>> for providing such thoughtful and insightful comments/questions.
>>>>>>>>>
>>>>>>>>> *Rishab*,
>>>>>>>>>
>>>>>>>>>> * suitable across all storage systems*
>>>>>>>>>
>>>>>>>>> You are right that the suitability is somewhat subjective and
>>>>>>>>> dependent on cloud provider used.
>>>>>>>>> In general, the goal of ShuffleVault is to utilize the standard
>>>>>>>>> Hadoop FileSystem APIs, which means it should work seamlessly with 
>>>>>>>>> popular
>>>>>>>>> cloud and distributed file systems (like S3, HDFS, GFS, etc.).
>>>>>>>>> These systems share a similar nature and are designed for large
>>>>>>>>> files.
>>>>>>>>>
>>>>>>>>> *large file could create problems during retrieval.*
>>>>>>>>>
>>>>>>>>> We are mitigating this risk by ensuring that  tasks do not read
>>>>>>>>> the entire consolidated file at once.
>>>>>>>>> Instead, the implementation is designed to read the data in
>>>>>>>>> configured blocks, rather than relying on a single read. *This
>>>>>>>>> behavior can be refined/validated* to make it more robust.
>>>>>>>>>
>>>>>>>>> *Wenchen,*
>>>>>>>>> I fully agree that the operational details around using cloud
>>>>>>>>> storage for shuffle—specifically traffic throttling, cleanup 
>>>>>>>>> guarantees
>>>>>>>>> and overall request-related network cost — these are critical
>>>>>>>>> issues that must be solved.
>>>>>>>>> The consolidation stage is explicitly designed to mitigate the
>>>>>>>>> throttling and accompanying cost issues .
>>>>>>>>> *Throttling* - By consolidating shuffle data, this approach
>>>>>>>>> transforms the read pattern from a multitude of small, random 
>>>>>>>>> requests into
>>>>>>>>> fewer, large, targeted ones. Particularly beneficial for modern cloud
>>>>>>>>> object storage.
>>>>>>>>> *Shuffle cleanup* -  I am actively trying to leverage the
>>>>>>>>> Shufffle clean up mode and also making an effort to make them robust 
>>>>>>>>> .These
>>>>>>>>> cleanup improvements should be beneficial, regardless of this 
>>>>>>>>> proposal and
>>>>>>>>> cover most cases.
>>>>>>>>> However, I agree that to ensure *no orphaned files* remain, we
>>>>>>>>> will still require other means (such as remote storage lifecycle 
>>>>>>>>> policies
>>>>>>>>> or job-specific scripts) for a guaranteed cleanup.
>>>>>>>>> Thank you again for your valuable feedback, especially the
>>>>>>>>> validation on synchronous scheduling and AQE integration.
>>>>>>>>>
>>>>>>>>> *Murali,*
>>>>>>>>>
>>>>>>>>>> * Doing an explicit sort stage*
>>>>>>>>>
>>>>>>>>> To clarify, ShuffleVault does not introduce an explicit sort
>>>>>>>>> stage. Instead, it introduces a Shuffle Consolidation Stage.
>>>>>>>>> This stage is a pure passthrough operation that only aggregates
>>>>>>>>> scattered shuffle data for a given reducer partition.
>>>>>>>>> In simple terms, it functions as an additional reducer stage that
>>>>>>>>> reads the fragmented shuffle files from the mappers and writes them 
>>>>>>>>> as a
>>>>>>>>> single, consolidated, durable file to remote storage.
>>>>>>>>>
>>>>>>>>> *but that would be a nontrivial change *
>>>>>>>>>
>>>>>>>>> I agree that the change is significant, I am  actively working to
>>>>>>>>> ensure the benefits are leveraged across the stack. This PR
>>>>>>>>> <https://github.com/apache/spark/pull/53028> demonstrates
>>>>>>>>> integration with AQE and interactions with other rules(Exchange reuse,
>>>>>>>>> Shuffle Partition Coalescing ect).
>>>>>>>>> I would genuinely appreciate it if you could take a look at the
>>>>>>>>> POC PR to see the scope00 of changes. The primary logic is 
>>>>>>>>> encapsulated
>>>>>>>>> within a new Spark Physical Planner Rule
>>>>>>>>> <https://github.com/apache/spark/pull/53028/files#diff-5a444440444095e67e15f707b7f5f34816c4e9c299cec4901a424a29a09874d6>
>>>>>>>>> that injects the consolidation stage, which is the main crux.
>>>>>>>>>
>>>>>>>>> I welcome any further questions or comments!
>>>>>>>>>
>>>>>>>>> Thanks & Regards
>>>>>>>>> Karuppayya
>>>>>>>>>
>>>>>>>>> On Tue, Nov 18, 2025 at 9:32 AM Mridul Muralidharan <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> There are existing Apache projects which provide the capabilities
>>>>>>>>>> which largely addresses the problem statement - Apache Celeborn, 
>>>>>>>>>> Apache
>>>>>>>>>> Uniffle, Zeus, etc.
>>>>>>>>>> Doing an explicit sort stage, between "map" and "reduce" brings
>>>>>>>>>> with it some nice advantages, especially if the output is durable, 
>>>>>>>>>> but that
>>>>>>>>>> would be a nontrivial change - and should be attempted if the 
>>>>>>>>>> benefits are
>>>>>>>>>> being leveraged throughout the stack (AQE, speculative execution, 
>>>>>>>>>> etc)
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Mridul
>>>>>>>>>>
>>>>>>>>>> On Tue, Nov 18, 2025 at 11:12 AM Wenchen Fan <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Karuppayya,
>>>>>>>>>>>
>>>>>>>>>>> Handling large shuffles in Spark is challenging and it's great
>>>>>>>>>>> to see proposals addressing it. I think the extra "shuffle 
>>>>>>>>>>> consolidation
>>>>>>>>>>> stage" is a good idea, and now I feel it's better for it to be 
>>>>>>>>>>> synchronous,
>>>>>>>>>>> so that we can integrate it with AQE and leverage the accurate 
>>>>>>>>>>> runtime
>>>>>>>>>>> shuffle status to make decisions about whether or not to launch 
>>>>>>>>>>> this extra
>>>>>>>>>>> "shuffle consolidation stage" and how to consolidate. This is a key
>>>>>>>>>>> differentiator compared to the push-based shuffle.
>>>>>>>>>>>
>>>>>>>>>>> However, there are many details to consider, and in general it's
>>>>>>>>>>> difficult to use cloud storage for shuffle. We need to deal with 
>>>>>>>>>>> problems
>>>>>>>>>>> like traffic throttling, cleanup guarantee, cost control, and so 
>>>>>>>>>>> on. Let's
>>>>>>>>>>> take a step back and see what are the actual problems of large 
>>>>>>>>>>> shuffles.
>>>>>>>>>>>
>>>>>>>>>>> Large shuffle usually starts with a large number of mappers that
>>>>>>>>>>> we can't adjust (e.g. large table scan). We can adjust the number of
>>>>>>>>>>> reducers to reach two goals:
>>>>>>>>>>> 1. The input data size of each reducer shouldn't be too large,
>>>>>>>>>>> which is roughly *total_shuffle_size / num_reducers*. This is
>>>>>>>>>>> to avoid spilling/OOM during reducer task execution.
>>>>>>>>>>> 2. The data size of each shuffle block shoudn't be too small,
>>>>>>>>>>> which is roughly *total_shuffle_size / (num_mappers *
>>>>>>>>>>> num_reducers)*. This is for the good of disk/network IO.
>>>>>>>>>>>
>>>>>>>>>>> These two goals are actually contradictory and sometimes we have
>>>>>>>>>>> to prioritize goal 1 (i.e. pick a large *num_reducers*) so that
>>>>>>>>>>> the query can finish. An extra "shuffle consolidation stage" can 
>>>>>>>>>>> kind of
>>>>>>>>>>> decrease the number of mappers, by merging the shuffle files from 
>>>>>>>>>>> multiple
>>>>>>>>>>> mappers. This can be a clear win as fetching many small shuffle 
>>>>>>>>>>> blocks can
>>>>>>>>>>> be quite slow, even slower than running an extra "shuffle 
>>>>>>>>>>> consolidation
>>>>>>>>>>> stage".
>>>>>>>>>>>
>>>>>>>>>>> In addition, the nodes that host shuffle files shouldn't be too
>>>>>>>>>>> many (best to be 0 which means shuffle files are stored in a 
>>>>>>>>>>> different
>>>>>>>>>>> storage). With a large number of mappers, likely every node in the 
>>>>>>>>>>> cluster
>>>>>>>>>>> stores some shuffle files. By merging shuffle files via the extra 
>>>>>>>>>>> "shuffle
>>>>>>>>>>> consolidation stage", we can decrease the number of nodes that host 
>>>>>>>>>>> active
>>>>>>>>>>> shuffle data, so that the cluster is more elastic.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Wenchen
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Nov 15, 2025 at 6:13 AM Rishab Joshi <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Karuppayya,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for sharing the proposal and this looks very exciting.
>>>>>>>>>>>>
>>>>>>>>>>>> I have a few questions and please correct me if I misunderstood
>>>>>>>>>>>> anything.
>>>>>>>>>>>>
>>>>>>>>>>>> Would it be possible to clarify whether the consolidated
>>>>>>>>>>>> shuffle file produced for each partition is suitable across all 
>>>>>>>>>>>> storage
>>>>>>>>>>>> systems, especially when this file becomes extremely large? I am 
>>>>>>>>>>>> wondering
>>>>>>>>>>>> if a very large file could create problems during retrieval. For 
>>>>>>>>>>>> example,
>>>>>>>>>>>> if a connection breaks while reading the file, some storage 
>>>>>>>>>>>> systems may not
>>>>>>>>>>>> support resuming reads from the point of failure and start reading 
>>>>>>>>>>>> the file
>>>>>>>>>>>> from the beginning again. This could lead to higher latency, 
>>>>>>>>>>>> repeated
>>>>>>>>>>>> retries, or performance bottlenecks when a partition becomes too 
>>>>>>>>>>>> large or
>>>>>>>>>>>> skewed?
>>>>>>>>>>>>
>>>>>>>>>>>> Would it make sense to introduce a configurable upper-bound on
>>>>>>>>>>>> the maximum allowed file size? This might prevent the file from 
>>>>>>>>>>>> growing
>>>>>>>>>>>> massively.
>>>>>>>>>>>> Should the consolidated shuffle file be compressed before being
>>>>>>>>>>>> written to the storage system. Compression might introduce 
>>>>>>>>>>>> additional
>>>>>>>>>>>> latency but that too can be a configurable option.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Rishab Joshi
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Nov 13, 2025 at 9:14 AM karuppayya <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Enrico,
>>>>>>>>>>>>> Thank you very much for reviewing the doc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Since the consolidation stage reads all the shuffle data, why
>>>>>>>>>>>>>> not doing the transformation in that stage? What is the point in 
>>>>>>>>>>>>>> deferring
>>>>>>>>>>>>>> the transformations into another stage?*
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> The reason for deferring the final consolidation to a
>>>>>>>>>>>>> subsequent stage lies in the distributed nature of shuffle data.
>>>>>>>>>>>>> Reducer requires reading all corresponding shuffle data
>>>>>>>>>>>>> written across all map tasks. Since each mapper only holds its 
>>>>>>>>>>>>> own local
>>>>>>>>>>>>> output, the consolidation cannot begin until all the map stage 
>>>>>>>>>>>>> completes.
>>>>>>>>>>>>>
>>>>>>>>>>>>> However, your question is also aligned to one of the
>>>>>>>>>>>>> approaches mentioned (concurrent consolidation
>>>>>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0#heading=h.tmi917h1n1vf>),
>>>>>>>>>>>>> which was specifically considered.
>>>>>>>>>>>>> While the synchronous consolidation happens afetr all the data
>>>>>>>>>>>>> is available , concurrent consolidation can aggregate and persist 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> already-generated shuffle data to begin concurrently with the 
>>>>>>>>>>>>> remaining map
>>>>>>>>>>>>> tasks, thereby making the shuffle durable much earlier instead of 
>>>>>>>>>>>>> having to
>>>>>>>>>>>>> wait for all map tasks to complete.
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Karuppayya
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Nov 13, 2025 at 1:13 AM Enrico Minack <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> another remark regarding a remote shuffle storage solution:
>>>>>>>>>>>>>> As long as the map executors are alive, reduce executors
>>>>>>>>>>>>>> should read from them to avoid any extra delay / overhead.
>>>>>>>>>>>>>> On fetch failure from a map executor, the reduce executors
>>>>>>>>>>>>>> should fall back to a remote storage that provides a copy 
>>>>>>>>>>>>>> (merged or not)
>>>>>>>>>>>>>> of the shuffle data.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Enrico
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am 13.11.25 um 09:42 schrieb Enrico Minack:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Karuppayya,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> thanks for your proposal and bringing up this issue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am very much in favour of a shuffle storage solution that
>>>>>>>>>>>>>> allows for dynamic allocation and node failure in a K8S 
>>>>>>>>>>>>>> environment,
>>>>>>>>>>>>>> without the burden of managing an Remote Shuffle Service.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have the following comments:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Your proposed consolidation stage is equivalent to the next
>>>>>>>>>>>>>> reducer stage in the sense that it reads shuffle data from the 
>>>>>>>>>>>>>> earlier map
>>>>>>>>>>>>>> stage. This requires the executors of the map stage to survive 
>>>>>>>>>>>>>> until the
>>>>>>>>>>>>>> shuffle data are consolidated ("merged" in Spark terminology). 
>>>>>>>>>>>>>> Therefore, I
>>>>>>>>>>>>>> think this passage of your design document is not accurate:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     Executors that perform the initial map tasks (shuffle
>>>>>>>>>>>>>> writers) can be immediately deallocated after writing their 
>>>>>>>>>>>>>> shuffle data ...
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Since the consolidation stage reads all the shuffle data, why
>>>>>>>>>>>>>> not doing the transformation in that stage? What is the point in 
>>>>>>>>>>>>>> deferring
>>>>>>>>>>>>>> the transformations into another stage?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> You mention the "Native Shuffle Block Migration" and say its
>>>>>>>>>>>>>> limitation is "It simply shifts the storage burden to other 
>>>>>>>>>>>>>> active
>>>>>>>>>>>>>> executors".
>>>>>>>>>>>>>> Please consider that the migration process can migrate to a
>>>>>>>>>>>>>> (in Spark called) fallback storage, which essentially copies the 
>>>>>>>>>>>>>> shuffle
>>>>>>>>>>>>>> data to a remote storage.
>>>>>>>>>>>>>> Kind regards,
>>>>>>>>>>>>>> Enrico
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Am 13.11.25 um 01:40 schrieb karuppayya:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>  Hi All,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I propose to utilize *Remote Storage as a Shuffle
>>>>>>>>>>>>>> Store, natively in Spark* .
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This approach would fundamentally decouple shuffle storage
>>>>>>>>>>>>>> from compute nodes, mitigating *shuffle fetch failures and
>>>>>>>>>>>>>> also help with aggressive downscaling*.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The primary goal is to enhance the *elasticity and
>>>>>>>>>>>>>> resilience* of Spark workloads, leading to substantial cost
>>>>>>>>>>>>>> optimization opportunities.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *I welcome any initial thoughts or concerns regarding this
>>>>>>>>>>>>>> idea.*
>>>>>>>>>>>>>> *Looking forward to your feedback! *
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> JIRA: SPARK-53484
>>>>>>>>>>>>>> <https://issues.apache.org/jira/browse/SPARK-54327>
>>>>>>>>>>>>>> SPIP doc
>>>>>>>>>>>>>> <https://docs.google.com/document/d/1leywkLgD62-MdG7e57n0vFRi7ICNxn9el9hpgchsVnk/edit?tab=t.0#heading=h.u4h68wupq6lw>
>>>>>>>>>>>>>> ,
>>>>>>>>>>>>>> Design doc
>>>>>>>>>>>>>> <https://docs.google.com/document/d/1tuWyXAaIBR0oVD5KZwYvz7JLyn6jB55_35xeslUEu7s/edit?tab=t.0>
>>>>>>>>>>>>>> PoC PR <https://github.com/apache/spark/pull/53028>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Karuppayya
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Rishab Joshi
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>
>> --
>> John Zhuge
>>
>

Reply via email to