[ 
https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646898#comment-17646898
 ] 

Rui Wang commented on SPARK-41512:
----------------------------------

cc [~cloud_fan]

> Row count based shuffle read to optimize global limit after a single 
> partition shuffle (optionally with input partition sorted)
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-41512
>                 URL: https://issues.apache.org/jira/browse/SPARK-41512
>             Project: Spark
>          Issue Type: Task
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>
> h3. Problem Statement
> In current Spark optimizer, a single partition shuffle might be created for a 
> limit if this limit is not the last non-action operation (e.g. a filter 
> following the limit). There is a possibility that the previous output 
> partitions before go into this limit are sorted. The single partition shuffle 
> approach has a correctness bug in this case: shuffle read partitions could be 
> out of partition order and the limit exec just take the first limit rows 
> which could lose the order thus result into wrong result. This is a shuffle 
> so it is relatively costly. Meanwhile, to correct this bug, a native solution 
> is to sort all the data fed into limit again, which is another overhead. 
> h3. Proposed idea
> So we propose a row count based AQE algorithm that optimizes this problem by 
> two folds:
> Avoid the extra sort on the shuffle read side (or with the limit exec) to 
> achieve the correct result.
> Avoid reading all shuffle data from mappers for this single partition shuffle 
> to reduce shuffle cost.
> Note that 1. is only applied for the sorted partition case where 2. is 
> applied for general single partition shuffle + limit case
>  
> The algorithm works as the following: 
> 1. Each mapper will record a row count when writing shuffle data.
> 2. Since this is single shuffle partition case, there is only one partition 
> but N mappers.
> 3. A accumulatorV2 is implemented to collect a list of tuple which records 
> the mapping between mapper id and the number of row written by the mapper 
> (row count metrics)
> 4. AQE framework detects a plan shape of shuffle plus a global limit.
> 5. AQE framework reads necessary data from mappers based on the limit. For 
> example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the 
> limit is 500, AQE creates shuffle read node to write from both mapper 1 and 
> 2, thus skip the left mappers.
> 6. This is both correct for limit with the sorted or non-sorted partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to