[
https://issues.apache.org/jira/browse/SPARK-50257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
guihuawen updated SPARK-50257:
------------------------------
Attachment: 截屏2024-11-07 13.52.45.png
> [Core]If ExpandExec is included, the CoalesceShufflePartitions rule will not
> be adjusted during the AQE phase
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-50257
> URL: https://issues.apache.org/jira/browse/SPARK-50257
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 4.0.0
> Reporter: guihuawen
> Priority: Major
> Fix For: 4.0.0
>
> Attachments: 截屏2024-11-07 13.52.45.png
>
>
> 【sql】
> {code:java}
> // code placeholder
> SELECT
> /*+ SHUFFLE_MERGE(b) */
> s_date,
> sum(s_quantity * i_price) AS total_sales
> FROM
> sales a
> JOIN items b ON s_item_id = i_item_id
> WHERE
> i_price < 10
> GROUP BY
> s_date with rollup;
> {code}
> 通过AQE后调整的执行计划
> {code:java}
> // code placeholder
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
> *(7) Sort [total_sales#1L DESC NULLS LAST], true, 0
> +- AQEShuffleRead coalesced
> +- ShuffleQueryStage 3
> +- Exchange rangepartitioning(total_sales#1L DESC NULLS LAST, 1000),
> ENSURE_REQUIREMENTS, [id=#294]
> +- *(6) HashAggregate(keys=[s_date#10, spark_grouping_id#9L],
> functions=[sum((s_quantity#3 * i_price#6))], output=[s_date#10,
> total_sales#1L])
> +- AQEShuffleRead coalesced
> +- ShuffleQueryStage 2
> +- Exchange hashpartitioning(s_date#10,
> spark_grouping_id#9L, 1000), ENSURE_REQUIREMENTS, [id=#250]
> +- *(5) HashAggregate(keys=[s_date#10,
> spark_grouping_id#9L], functions=[partial_sum((s_quantity#3 * i_price#6))],
> output=[s_date#10, spark_grouping_id#9L, sum#14L])
> +- *(5) Expand [[s_quantity#3, i_price#6,
> s_date#4, 0], [s_quantity#3, i_price#6, null, 1]], [s_quantity#3, i_price#6,
> s_date#10, spark_grouping_id#9L]
> +- *(5) Project [s_quantity#3, i_price#6,
> s_date#4]
> +- *(5) SortMergeJoin(skew=true)
> [cast(s_item_id#2 as bigint)], [i_item_id#5L], Inner
> :- *(3) Sort [cast(s_item_id#2 as bigint)
> ASC NULLS FIRST], false, 0
> : +- AQEShuffleRead coalesced and skewed
> : +- ShuffleQueryStage 0
> : +- Exchange
> hashpartitioning(cast(s_item_id#2 as bigint), 1000), ENSURE_REQUIREMENTS,
> [id=#101]
> : +- *(1) Filter
> isnotnull(s_item_id#2)
> : +- *(1) ColumnarToRow
> : +- FileScan parquet
> bigdata_qa.sales[s_item_id#2,s_quantity#3,s_date#4] Batched: true,
> DataFilters: [isnotnull(s_item_id#2)], Format: Parquet, Location:
> InMemoryFileIndex(1
> paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/sa...,
> PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema:
> struct<s_item_id:int,s_quantity:int,s_date:date>
> +- *(4) Sort [i_item_id#5L ASC NULLS
> FIRST], false, 0
> +- AQEShuffleRead coalesced
> +- ShuffleQueryStage 1
> +- Exchange
> hashpartitioning(i_item_id#5L, 1000), ENSURE_REQUIREMENTS, [id=#118]
> +- *(2) Filter
> ((isnotnull(i_price#6) AND (i_price#6 < 10)) AND isnotnull(i_item_id#5L))
> +- *(2) ColumnarToRow
> +- FileScan parquet
> bigdata_qa.items[i_item_id#5L,i_price#6] Batched: true, DataFilters:
> [isnotnull(i_price#6), (i_price#6 < 10), isnotnull(i_item_id#5L)], Format:
> Parquet, Location: InMemoryFileIndex(1
> paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/it...,
> PartitionFilters: [], PushedFilters: [IsNotNull(i_price),
> LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema:
> struct<i_item_id:bigint,i_price:int>
> {code}
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]