[ 
https://issues.apache.org/jira/browse/SPARK-50257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guihuawen updated SPARK-50257:
------------------------------
    Description: 
【sql】
{code:java}
// code placeholder
SELECT
       /*+ SHUFFLE_MERGE(b) */
       s_date,
       sum(s_quantity * i_price) AS total_sales
   FROM
       sales a
       JOIN items b ON s_item_id = i_item_id
   WHERE
       i_price < 10
   GROUP BY
       s_date with rollup;
 {code}
Set spark.sql.shuffle.partitions=1000

After aqe:

!截屏2024-11-07 13.52.45.png!

 

 

  was:
【sql】
{code:java}
// code placeholder
SELECT
       /*+ SHUFFLE_MERGE(b) */
       s_date,
       sum(s_quantity * i_price) AS total_sales
   FROM
       sales a
       JOIN items b ON s_item_id = i_item_id
   WHERE
       i_price < 10
   GROUP BY
       s_date with rollup;
 {code}
通过AQE后调整的执行计划
{code:java}
// code placeholder
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(7) Sort [total_sales#1L DESC NULLS LAST], true, 0
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 3
         +- Exchange rangepartitioning(total_sales#1L DESC NULLS LAST, 1000), 
ENSURE_REQUIREMENTS, [id=#294]
            +- *(6) HashAggregate(keys=[s_date#10, spark_grouping_id#9L], 
functions=[sum((s_quantity#3 * i_price#6))], output=[s_date#10, total_sales#1L])
               +- AQEShuffleRead coalesced
                  +- ShuffleQueryStage 2
                     +- Exchange hashpartitioning(s_date#10, 
spark_grouping_id#9L, 1000), ENSURE_REQUIREMENTS, [id=#250]
                        +- *(5) HashAggregate(keys=[s_date#10, 
spark_grouping_id#9L], functions=[partial_sum((s_quantity#3 * i_price#6))], 
output=[s_date#10, spark_grouping_id#9L, sum#14L])
                           +- *(5) Expand [[s_quantity#3, i_price#6, s_date#4, 
0], [s_quantity#3, i_price#6, null, 1]], [s_quantity#3, i_price#6, s_date#10, 
spark_grouping_id#9L]
                              +- *(5) Project [s_quantity#3, i_price#6, 
s_date#4]
                                 +- *(5) SortMergeJoin(skew=true) 
[cast(s_item_id#2 as bigint)], [i_item_id#5L], Inner
                                    :- *(3) Sort [cast(s_item_id#2 as bigint) 
ASC NULLS FIRST], false, 0
                                    :  +- AQEShuffleRead coalesced and skewed
                                    :     +- ShuffleQueryStage 0
                                    :        +- Exchange 
hashpartitioning(cast(s_item_id#2 as bigint), 1000), ENSURE_REQUIREMENTS, 
[id=#101]
                                    :           +- *(1) Filter 
isnotnull(s_item_id#2)
                                    :              +- *(1) ColumnarToRow
                                    :                 +- FileScan parquet 
bigdata_qa.sales[s_item_id#2,s_quantity#3,s_date#4] Batched: true, DataFilters: 
[isnotnull(s_item_id#2)], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/sa...,
 PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema: 
struct<s_item_id:int,s_quantity:int,s_date:date>
                                    +- *(4) Sort [i_item_id#5L ASC NULLS 
FIRST], false, 0
                                       +- AQEShuffleRead coalesced
                                          +- ShuffleQueryStage 1
                                             +- Exchange 
hashpartitioning(i_item_id#5L, 1000), ENSURE_REQUIREMENTS, [id=#118]
                                                +- *(2) Filter 
((isnotnull(i_price#6) AND (i_price#6 < 10)) AND isnotnull(i_item_id#5L))
                                                   +- *(2) ColumnarToRow
                                                      +- FileScan parquet 
bigdata_qa.items[i_item_id#5L,i_price#6] Batched: true, DataFilters: 
[isnotnull(i_price#6), (i_price#6 < 10), isnotnull(i_item_id#5L)], Format: 
Parquet, Location: InMemoryFileIndex(1 
paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/it...,
 PartitionFilters: [], PushedFilters: [IsNotNull(i_price), 
LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema: 
struct<i_item_id:bigint,i_price:int>
 {code}
 

 

 

 


> [Core]If ExpandExec is included, the CoalesceShufflePartitions rule will not 
> be adjusted during the AQE phase
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-50257
>                 URL: https://issues.apache.org/jira/browse/SPARK-50257
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 4.0.0
>            Reporter: guihuawen
>            Priority: Major
>             Fix For: 4.0.0
>
>         Attachments: 截屏2024-11-07 13.52.45.png
>
>
> 【sql】
> {code:java}
> // code placeholder
> SELECT
>        /*+ SHUFFLE_MERGE(b) */
>        s_date,
>        sum(s_quantity * i_price) AS total_sales
>    FROM
>        sales a
>        JOIN items b ON s_item_id = i_item_id
>    WHERE
>        i_price < 10
>    GROUP BY
>        s_date with rollup;
>  {code}
> Set spark.sql.shuffle.partitions=1000
> After aqe:
> !截屏2024-11-07 13.52.45.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to