[ https://issues.apache.org/jira/browse/SPARK-33701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mridul Muralidharan resolved SPARK-33701. ----------------------------------------- Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 33896 [https://github.com/apache/spark/pull/33896] > Adaptive shuffle merge finalization for push-based shuffle > ---------------------------------------------------------- > > Key: SPARK-33701 > URL: https://issues.apache.org/jira/browse/SPARK-33701 > Project: Spark > Issue Type: Sub-task > Components: Shuffle, Spark Core > Affects Versions: 3.1.0 > Reporter: Min Shen > Priority: Major > Fix For: 3.3.0 > > > SPARK-32920 implements a simple approach for shuffle merge finalization, > which transitions from shuffle map stage to reduce stage when push-based > shuffle is enabled. > This simple approach basically waits for a static period of time after all > map tasks are finished before initiating shuffle merge finalization. This > approach is not very ideal to handle jobs with varying size of shuffles. For > a small shuffle, we want the merge finalization to happen as early and as > quickly as possible. For a large shuffle, we might want to wait for longer > time to achieve a better merge ratio. A static configuration for the entire > job cannot adapt to such varying needs. > This raises the need for adaptive shuffle merge finalization, where the > amount of time to wait before merge finalization is adaptive to the size of > the shuffle. We have implemented an effective adaptive shuffle merge > finalization mechanism, which introduces 2 more config parameters: > spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. > Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge > finalization works in the following way: > # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data > generated by a mapper, it notifies the Spark driver about this. > # When the Spark driver receives notification of a completed shuffle push, > it updates state maintained in the corresponding ShuffleDependency. > # If the ratio of completed pushes (# completed pushes / # map tasks) > exceeds minPushRatio, the driver would then immediately schedule shuffle > merge finalization. > # If the driver receives notification that all map tasks have finished > first, it would then gather the size of the shuffle from MapOutputStatistics. > If the total shuffle size is smaller than minSizeToWait, the driver would > ignore the pushed shuffle partition and treat the shuffle as a regular > shuffle and start schedule the reduce stage. It would also asynchronously > schedule shuffle merge finalization immediately, but ignores all the > responses. > # If the total shuffle size is larger than minSizeToWait, the driver would > schedule shuffle merge finalization after waiting for a period of time of > finalize.time. If during this wait time the driver receives enough push > completion notification to reach minPushRatio, the driver would then > reschedule the shuffle merge finalization for immediate execution. > In addition to the above, per SPARK-36530, we should also check if no block > gets pushed because all blocks are larger than > spark.shuffle.push.maxBlockSizeToPush. If so, we should also skip shuffle > merge finalization. The information about whether any blocks from a mapper > get pushed can be included in the new RPC between Spark executor/driver to > notify driver about push completion. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org