[
https://issues.apache.org/jira/browse/SPARK-50257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
guihuawen updated SPARK-50257:
------------------------------
Description:
【sql】
{code:java}
// code placeholder
SELECT
/*+ SHUFFLE_MERGE(b) */
s_date,
sum(s_quantity * i_price) AS total_sales
FROM
sales a
JOIN items b ON s_item_id = i_item_id
WHERE
i_price < 10
GROUP BY
s_date with rollup;
{code}
Set spark.sql.shuffle.partitions=1000
After aqe:
!截屏2024-11-07 13.52.45.png!
was:
【sql】
{code:java}
// code placeholder
SELECT
/*+ SHUFFLE_MERGE(b) */
s_date,
sum(s_quantity * i_price) AS total_sales
FROM
sales a
JOIN items b ON s_item_id = i_item_id
WHERE
i_price < 10
GROUP BY
s_date with rollup;
{code}
通过AQE后调整的执行计划
{code:java}
// code placeholder
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(7) Sort [total_sales#1L DESC NULLS LAST], true, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 3
+- Exchange rangepartitioning(total_sales#1L DESC NULLS LAST, 1000),
ENSURE_REQUIREMENTS, [id=#294]
+- *(6) HashAggregate(keys=[s_date#10, spark_grouping_id#9L],
functions=[sum((s_quantity#3 * i_price#6))], output=[s_date#10, total_sales#1L])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(s_date#10,
spark_grouping_id#9L, 1000), ENSURE_REQUIREMENTS, [id=#250]
+- *(5) HashAggregate(keys=[s_date#10,
spark_grouping_id#9L], functions=[partial_sum((s_quantity#3 * i_price#6))],
output=[s_date#10, spark_grouping_id#9L, sum#14L])
+- *(5) Expand [[s_quantity#3, i_price#6, s_date#4,
0], [s_quantity#3, i_price#6, null, 1]], [s_quantity#3, i_price#6, s_date#10,
spark_grouping_id#9L]
+- *(5) Project [s_quantity#3, i_price#6,
s_date#4]
+- *(5) SortMergeJoin(skew=true)
[cast(s_item_id#2 as bigint)], [i_item_id#5L], Inner
:- *(3) Sort [cast(s_item_id#2 as bigint)
ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced and skewed
: +- ShuffleQueryStage 0
: +- Exchange
hashpartitioning(cast(s_item_id#2 as bigint), 1000), ENSURE_REQUIREMENTS,
[id=#101]
: +- *(1) Filter
isnotnull(s_item_id#2)
: +- *(1) ColumnarToRow
: +- FileScan parquet
bigdata_qa.sales[s_item_id#2,s_quantity#3,s_date#4] Batched: true, DataFilters:
[isnotnull(s_item_id#2)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/sa...,
PartitionFilters: [], PushedFilters: [IsNotNull(s_item_id)], ReadSchema:
struct<s_item_id:int,s_quantity:int,s_date:date>
+- *(4) Sort [i_item_id#5L ASC NULLS
FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange
hashpartitioning(i_item_id#5L, 1000), ENSURE_REQUIREMENTS, [id=#118]
+- *(2) Filter
((isnotnull(i_price#6) AND (i_price#6 < 10)) AND isnotnull(i_item_id#5L))
+- *(2) ColumnarToRow
+- FileScan parquet
bigdata_qa.items[i_item_id#5L,i_price#6] Batched: true, DataFilters:
[isnotnull(i_price#6), (i_price#6 < 10), isnotnull(i_item_id#5L)], Format:
Parquet, Location: InMemoryFileIndex(1
paths)[hdfs://DClusterNmg5/user/prod_bigdata_qa/bigdata_qa/hive/bigdata_qa/it...,
PartitionFilters: [], PushedFilters: [IsNotNull(i_price),
LessThan(i_price,10), IsNotNull(i_item_id)], ReadSchema:
struct<i_item_id:bigint,i_price:int>
{code}
> [Core]If ExpandExec is included, the CoalesceShufflePartitions rule will not
> be adjusted during the AQE phase
> -------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-50257
> URL: https://issues.apache.org/jira/browse/SPARK-50257
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 4.0.0
> Reporter: guihuawen
> Priority: Major
> Fix For: 4.0.0
>
> Attachments: 截屏2024-11-07 13.52.45.png
>
>
> 【sql】
> {code:java}
> // code placeholder
> SELECT
> /*+ SHUFFLE_MERGE(b) */
> s_date,
> sum(s_quantity * i_price) AS total_sales
> FROM
> sales a
> JOIN items b ON s_item_id = i_item_id
> WHERE
> i_price < 10
> GROUP BY
> s_date with rollup;
> {code}
> Set spark.sql.shuffle.partitions=1000
> After aqe:
> !截屏2024-11-07 13.52.45.png!
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]