[ 
https://issues.apache.org/jira/browse/SPARK-33701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mridul Muralidharan resolved SPARK-33701.
-----------------------------------------
    Fix Version/s: 3.3.0
       Resolution: Fixed

Issue resolved by pull request 33896
[https://github.com/apache/spark/pull/33896]

> Adaptive shuffle merge finalization for push-based shuffle
> ----------------------------------------------------------
>
>                 Key: SPARK-33701
>                 URL: https://issues.apache.org/jira/browse/SPARK-33701
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Shuffle, Spark Core
>    Affects Versions: 3.1.0
>            Reporter: Min Shen
>            Priority: Major
>             Fix For: 3.3.0
>
>
> SPARK-32920 implements a simple approach for shuffle merge finalization, 
> which transitions from shuffle map stage to reduce stage when push-based 
> shuffle is enabled.
> This simple approach basically waits for a static period of time after all 
> map tasks are finished before initiating shuffle merge finalization. This 
> approach is not very ideal to handle jobs with varying size of shuffles. For 
> a small shuffle, we want the merge finalization to happen as early and as 
> quickly as possible. For a large shuffle, we might want to wait for longer 
> time to achieve a better merge ratio. A static configuration for the entire 
> job cannot adapt to such varying needs.
> This raises the need for adaptive shuffle merge finalization, where the 
> amount of time to wait before merge finalization is adaptive to the size of 
> the shuffle. We have implemented an effective adaptive shuffle merge 
> finalization mechanism, which introduces 2 more config parameters: 
> spark.shuffle.push.minShuffleSizeToWait and spark.shuffle.push.minPushRatio. 
> Together with spark.shuffle.push.finalize.time, the adaptive shuffle merge 
> finalization works in the following way:
>  # Whenever a ShuffleBlockPusher finishes pushing all the shuffle data 
> generated by a mapper, it notifies the Spark driver about this.
>  # When the Spark driver receives notification of a completed shuffle push, 
> it updates state maintained in the corresponding ShuffleDependency.
>  # If the ratio of completed pushes (# completed pushes / # map tasks) 
> exceeds minPushRatio, the driver would then immediately schedule shuffle 
> merge finalization.
>  # If the driver receives notification that all map tasks have finished 
> first, it would then gather the size of the shuffle from MapOutputStatistics. 
> If the total shuffle size is smaller than minSizeToWait, the driver would 
> ignore the pushed shuffle partition and treat the shuffle as a regular 
> shuffle and start schedule the reduce stage. It would also asynchronously 
> schedule shuffle merge finalization immediately, but ignores all the 
> responses.
>  # If the total shuffle size is larger than minSizeToWait, the driver would 
> schedule shuffle merge finalization after waiting for a period of time of 
> finalize.time. If during this wait time the driver receives enough push 
> completion notification to reach minPushRatio, the driver would then 
> reschedule the shuffle merge finalization for immediate execution.
> In addition to the above, per SPARK-36530, we should also check if no block 
> gets pushed because all blocks are larger than 
> spark.shuffle.push.maxBlockSizeToPush. If so, we should also skip shuffle 
> merge finalization. The information about whether any blocks from a mapper 
> get pushed can be included in the new RPC between Spark executor/driver to 
> notify driver about push completion.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to