[ 
https://issues.apache.org/jira/browse/SPARK-50257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guihuawen updated SPARK-50257:
------------------------------
    Attachment: 截屏2024-11-07 13.52.45.png

> [Core]If ExpandExec is included, the CoalesceShufflePartitions rule will not 
> be adjusted during the AQE phase
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-50257
>                 URL: https://issues.apache.org/jira/browse/SPARK-50257
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 4.0.0
>            Reporter: guihuawen
>            Priority: Major
>             Fix For: 4.0.0
>
>         Attachments: 截屏2024-11-07 13.52.45.png
>
>
> 【sql】
> {code:java}
> // code placeholder
> SELECT
>        /*+ SHUFFLE_MERGE(b) */
>        s_date,
>        sum(s_quantity * i_price) AS total_sales
>    FROM
>        sales a
>        JOIN items b ON s_item_id = i_item_id
>    WHERE
>        i_price < 10
>    GROUP BY
>        s_date with rollup;
>  {code}
> 通过AQE后调整的执行计划
> {code:java}
> // code placeholder
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
>    *(7) Sort [total_sales#1L DESC NULLS LAST], true, 0
>    +- AQEShuffleRead coalesced
>       +- ShuffleQueryStage 3
>          +- Exchange rangepartitioning(total_sales#1L DESC NULLS LAST, 1000), 
> ENSURE_REQUIREMENTS, [id=#294]
>             +- *(6) HashAggregate(keys=[s_date#10, spark_grouping_id#9L], 
> functions=[sum((s_quantity#3 * i_price#6))], output=[s_date#10, 
> total_sales#1L])
>                +- AQEShuffleRead coalesced
>                   +- ShuffleQueryStage 2
>                      +- Exchange hashpartitioning(s_date#10, 
> spark_grouping_id#9L, 1000), ENSURE_REQUIREMENTS, [id=#250]
>                         +- *(5) HashAggregate(keys=[s_date#10, 
> spark_grouping_id#9L], functions=[partial_sum((s_quantity#3 * i_price#6))], 
> output=[s_date#10, spark_grouping_id#9L, sum#14L])
>                            +- *(5) Expand [[s_quantity#3, i_price#6, 
> s_date#4, 0], [s_quantity#3, i_price#6, null, 1]], [s_quantity#3, i_price#6, 
> s_date#10, spark_grouping_id#9L]
>                               +- *(5) Project [s_quantity#3, i_price#6, 
> s_date#4]
>                                  +- *(5) SortMergeJoin(skew=true) 
> [cast(s_item_id#2 as bigint)], [i_item_id#5L], Inner
>                                     :- *(3) Sort [cast(s_item_id#2 as bigint) 
> ASC NULLS FIRST], false, 0
>                                     :  +- AQEShuffleRead coalesced and skewed
>                                     :     +- ShuffleQueryStage 0
>                                     :        +- Exchange 
> hashpartitioning(cast(s_item_id#2 as bigint), 1000), ENSURE_REQUIREMENTS, 
> [id=#101]
>                                     :           +- *(1) Filter 
> isnotnull(s_item_id#2)
>                                     :              +- *(1) ColumnarToRow
>                                     :                 +- FileScan parquet 
> bigdata_qa.sales[s_item_id#2,s_quantity#3,s_date#4] Batched: true, 
> DataFilters: [isnotnull(s_item_id#2)], Format: Parquet, Location: 
> InMemoryFileIndex(1 
> paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/sa...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema: 
> struct<s_item_id:int,s_quantity:int,s_date:date>
>                                     +- *(4) Sort [i_item_id#5L ASC NULLS 
> FIRST], false, 0
>                                        +- AQEShuffleRead coalesced
>                                           +- ShuffleQueryStage 1
>                                              +- Exchange 
> hashpartitioning(i_item_id#5L, 1000), ENSURE_REQUIREMENTS, [id=#118]
>                                                 +- *(2) Filter 
> ((isnotnull(i_price#6) AND (i_price#6 < 10)) AND isnotnull(i_item_id#5L))
>                                                    +- *(2) ColumnarToRow
>                                                       +- FileScan parquet 
> bigdata_qa.items[i_item_id#5L,i_price#6] Batched: true, DataFilters: 
> [isnotnull(i_price#6), (i_price#6 < 10), isnotnull(i_item_id#5L)], Format: 
> Parquet, Location: InMemoryFileIndex(1 
> paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/it...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(i_price), 
> LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema: 
> struct<i_item_id:bigint,i_price:int>
>  {code}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to