[
https://issues.apache.org/jira/browse/SPARK-53576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Qishang Zhong updated SPARK-53576:
----------------------------------
Summary: Physical plan changes after upgrading from 3.5.0 to 4.0 (was:
Physical plan changes after upgrading to 4.0)
> Physical plan changes after upgrading from 3.5.0 to 4.0
> -------------------------------------------------------
>
> Key: SPARK-53576
> URL: https://issues.apache.org/jira/browse/SPARK-53576
> Project: Spark
> Issue Type: Question
> Components: SQL
> Affects Versions: 4.0.0
> Reporter: Qishang Zhong
> Priority: Minor
> Attachments: image-2025-09-15-14-00-43-305.png,
> image-2025-09-15-14-02-30-833.png, image-2025-09-15-14-02-54-564.png,
> image-2025-09-15-14-03-14-349.png
>
>
> I have a sql upgrade from 3.5.0 to 4.0.0,
> Before the upgrade (3.5.0), there was an AQEShuffleRead,
>
> !image-2025-09-15-14-03-14-349.png!
> but after the upgrade, it disappeared, resulting in the generation of many
> small files. What is the reason for this? How can I keep the previous plain
> !image-2025-09-15-14-02-54-564.png!
>
> {code:java}
> // sql
> create table if not exists database_xxx.table_xxx_insert stored as ORC as
> select *, '新用户' as user_type, 'gdt' as parent_name
> from database_xxx.table_xxx
> union all
> select *, '召回用户' as user_type, 'gdt' as parent_name
> from database_xxx.table_xxx
> union all
> select *, '未注册用户' as user_type, 'gdt' as parent_name
> from database_xxx.table_xxx;
> {code}
>
> before spark 3.5.0
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (23)
> +- == Final Plan ==
> Execute InsertIntoHadoopFsRelationCommand (15)
> +- WriteFiles (14)
> +- AQEShuffleRead (13)
> +- ShuffleQueryStage (12), Statistics(sizeInBytes=301.3 MiB, rowCount=1.26E+6)
> +- Exchange (11)
> +- Union (10)
> :- * Project (3)
> : +- * ColumnarToRow (2)
> : +- Scan orc spark_catalog.database_xxx.table_xxx (1)
> :- * Project (6)
> : +- * ColumnarToRow (5)
> : +- Scan orc spark_catalog.database_xxx.table_xxx (4)
> +- * Project (9)
> +- * ColumnarToRow (8)
> +- Scan orc spark_catalog.database_xxx.table_xxx (7)
> +- == Initial Plan ==
> Execute InsertIntoHadoopFsRelationCommand (22)
> +- WriteFiles (21)
> +- Exchange (20)
> +- Union (19)
> :- Project (16)
> : +- Scan orc spark_catalog.database_xxx.table_xxx (1)
> :- Project (17)
> : +- Scan orc spark_catalog.database_xxx.table_xxx (4)
> +- Project (18)
> +- Scan orc spark_catalog.database_xxx.table_xxx (7)
> (1) Scan orc spark_catalog.database_xxx.table_xxx
> Output [9]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27]
> Batched: true
> Location: InMemoryFileIndex
> [hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx]
> ReadSchema:
> struct<adv_group_id:string,adv_group_name:string,account_id:string,account_name:string,cost:string,showcnt:string,clickcnt:string,clickrate:string,currentdate:string>
> (2) ColumnarToRow [codegen id : 1]
> Input [9]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27]
> (3) Project [codegen id : 1]
> Output [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, 新用户 AS user_type#13, gdt AS parent_name#14]
> Input [9]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27]
> (4) Scan orc spark_catalog.database_xxx.table_xxx
> Output [9]: [adv_group_id#28, adv_group_name#29, account_id#30,
> account_name#31, cost#32, showcnt#33, clickcnt#34, clickrate#35,
> currentdate#36]
> Batched: true
> Location: InMemoryFileIndex
> [hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx]
> ReadSchema:
> struct<adv_group_id:string,adv_group_name:string,account_id:string,account_name:string,cost:string,showcnt:string,clickcnt:string,clickrate:string,currentdate:string>
> (5) ColumnarToRow [codegen id : 2]
> Input [9]: [adv_group_id#28, adv_group_name#29, account_id#30,
> account_name#31, cost#32, showcnt#33, clickcnt#34, clickrate#35,
> currentdate#36]
> (6) Project [codegen id : 2]
> Output [11]: [adv_group_id#28, adv_group_name#29, account_id#30,
> account_name#31, cost#32, showcnt#33, clickcnt#34, clickrate#35,
> currentdate#36, 召回用户 AS user_type#15, gdt AS parent_name#16]
> Input [9]: [adv_group_id#28, adv_group_name#29, account_id#30,
> account_name#31, cost#32, showcnt#33, clickcnt#34, clickrate#35,
> currentdate#36]
> (7) Scan orc spark_catalog.database_xxx.table_xxx
> Output [9]: [adv_group_id#37, adv_group_name#38, account_id#39,
> account_name#40, cost#41, showcnt#42, clickcnt#43, clickrate#44,
> currentdate#45]
> Batched: true
> Location: InMemoryFileIndex
> [hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx]
> ReadSchema:
> struct<adv_group_id:string,adv_group_name:string,account_id:string,account_name:string,cost:string,showcnt:string,clickcnt:string,clickrate:string,currentdate:string>
> (8) ColumnarToRow [codegen id : 3]
> Input [9]: [adv_group_id#37, adv_group_name#38, account_id#39,
> account_name#40, cost#41, showcnt#42, clickcnt#43, clickrate#44,
> currentdate#45]
> (9) Project [codegen id : 3]
> Output [11]: [adv_group_id#37, adv_group_name#38, account_id#39,
> account_name#40, cost#41, showcnt#42, clickcnt#43, clickrate#44,
> currentdate#45, 未注册用户 AS user_type#17, gdt AS parent_name#18]
> Input [9]: [adv_group_id#37, adv_group_name#38, account_id#39,
> account_name#40, cost#41, showcnt#42, clickcnt#43, clickrate#44,
> currentdate#45]
> (10) Union
> (11) Exchange
> Input [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, user_type#13, parent_name#14]
> Arguments: RoundRobinPartitioning(1200), REBALANCE_PARTITIONS_BY_NONE,
> [plan_id=86]
> (12) ShuffleQueryStage
> Output [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, user_type#13, parent_name#14]
> Arguments: 0
> (13) AQEShuffleRead
> Input [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, user_type#13, parent_name#14]
> Arguments: coalesced
> (14) WriteFiles
> Input [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, user_type#13, parent_name#14]
> (15) Execute InsertIntoHadoopFsRelationCommand
> Input: []
> Arguments:
> hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx_insert, false,
> ORC, [serialization.format=1,
> __hive_compatible_bucketed_table_insertion__=true], Append,
> `spark_catalog`.`database_xxx`.`table_xxx_insert`,
> org.apache.hadoop.hive.ql.io.orc.OrcSerde,
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex(hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx_insert),
> [adv_group_id, adv_group_name, account_id, account_name, cost, showcnt,
> clickcnt, clickrate, currentdate, user_type, parent_name]
> (16) Project
> Output [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, 新用户 AS user_type#13, gdt AS parent_name#14]
> Input [9]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27]
> (17) Project
> Output [11]: [adv_group_id#28, adv_group_name#29, account_id#30,
> account_name#31, cost#32, showcnt#33, clickcnt#34, clickrate#35,
> currentdate#36, 召回用户 AS user_type#15, gdt AS parent_name#16]
> Input [9]: [adv_group_id#28, adv_group_name#29, account_id#30,
> account_name#31, cost#32, showcnt#33, clickcnt#34, clickrate#35,
> currentdate#36]
> (18) Project
> Output [11]: [adv_group_id#37, adv_group_name#38, account_id#39,
> account_name#40, cost#41, showcnt#42, clickcnt#43, clickrate#44,
> currentdate#45, 未注册用户 AS user_type#17, gdt AS parent_name#18]
> Input [9]: [adv_group_id#37, adv_group_name#38, account_id#39,
> account_name#40, cost#41, showcnt#42, clickcnt#43, clickrate#44,
> currentdate#45]
> (19) Union
> (20) Exchange
> Input [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, user_type#13, parent_name#14]
> Arguments: RoundRobinPartitioning(1200), REBALANCE_PARTITIONS_BY_NONE,
> [plan_id=50]
> (21) WriteFiles
> Input [11]: [adv_group_id#19, adv_group_name#20, account_id#21,
> account_name#22, cost#23, showcnt#24, clickcnt#25, clickrate#26,
> currentdate#27, user_type#13, parent_name#14]
> (22) Execute InsertIntoHadoopFsRelationCommand
> Input: []
> Arguments:
> hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx_insert, false,
> ORC, [serialization.format=1,
> __hive_compatible_bucketed_table_insertion__=true], Append,
> `spark_catalog`.`database_xxx`.`table_xxx_insert`,
> org.apache.hadoop.hive.ql.io.orc.OrcSerde,
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex(hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx_insert),
> [adv_group_id, adv_group_name, account_id, account_name, cost, showcnt,
> clickcnt, clickrate, currentdate, user_type, parent_name]
> (23) AdaptiveSparkPlan
> Output: []
> Arguments: isFinalPlan=true
> {code}
> after spark 4.0.0
> {code:java}
> // == Physical Plan ==
> Execute InsertIntoHadoopFsRelationCommand (9)
> +- WriteFiles (8)
> +- Union (7)
> :- * Project (2)
> : +- Scan orc spark_catalog.database_xxx.table_xxx (1)
> :- * Project (4)
> : +- Scan orc spark_catalog.database_xxx.table_xxx (3)
> +- * Project (6)
> +- Scan orc spark_catalog.database_xxx.table_xxx (5)
> (1) Scan orc spark_catalog.database_xxx.table_xxx
> Output [9]: adv_group_id#12, adv_group_name#13, account_id#14,
> account_name#15, cost#16, showcnt#17, clickcnt#18, clickrate#19,
> currentdate#20
> Batched: false
> Location: InMemoryFileIndex
> [hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx]
> ReadSchema:
> struct<adv_group_id:string,adv_group_name:string,account_id:string,account_name:string,cost:string,showcnt:string,clickcnt:string,clickrate:string,currentdate:string>
> (2) Project [codegen id : 1]
> Output [11]: adv_group_id#12, adv_group_name#13, account_id#14,
> account_name#15, cost#16, showcnt#17, clickcnt#18, clickrate#19,
> currentdate#20, 新用户 AS user_type#6, gdt AS parent_name#7
> Input [9]: adv_group_id#12, adv_group_name#13, account_id#14,
> account_name#15, cost#16, showcnt#17, clickcnt#18, clickrate#19,
> currentdate#20
> (3) Scan orc spark_catalog.database_xxx.table_xxx
> Output [9]: adv_group_id#21, adv_group_name#22, account_id#23,
> account_name#24, cost#25, showcnt#26, clickcnt#27, clickrate#28,
> currentdate#29
> Batched: false
> Location: InMemoryFileIndex
> [hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx]
> ReadSchema:
> struct<adv_group_id:string,adv_group_name:string,account_id:string,account_name:string,cost:string,showcnt:string,clickcnt:string,clickrate:string,currentdate:string>
> (4) Project [codegen id : 2]
> Output [11]: adv_group_id#21, adv_group_name#22, account_id#23,
> account_name#24, cost#25, showcnt#26, clickcnt#27, clickrate#28,
> currentdate#29, 召回用户 AS user_type#8, gdt AS parent_name#9
> Input [9]: adv_group_id#21, adv_group_name#22, account_id#23,
> account_name#24, cost#25, showcnt#26, clickcnt#27, clickrate#28,
> currentdate#29
> (5) Scan orc spark_catalog.database_xxx.table_xxx
> Output [9]: adv_group_id#30, adv_group_name#31, account_id#32,
> account_name#33, cost#34, showcnt#35, clickcnt#36, clickrate#37,
> currentdate#38
> Batched: false
> Location: InMemoryFileIndex
> [hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx]
> ReadSchema:
> struct<adv_group_id:string,adv_group_name:string,account_id:string,account_name:string,cost:string,showcnt:string,clickcnt:string,clickrate:string,currentdate:string>
> (6) Project [codegen id : 3]
> Output [11]: adv_group_id#30, adv_group_name#31, account_id#32,
> account_name#33, cost#34, showcnt#35, clickcnt#36, clickrate#37,
> currentdate#38, 未注册用户 AS user_type#10, gdt AS parent_name#11
> Input [9]: adv_group_id#30, adv_group_name#31, account_id#32,
> account_name#33, cost#34, showcnt#35, clickcnt#36, clickrate#37,
> currentdate#38
> (7) Union
> (8) WriteFiles
> Input [11]: adv_group_id#12, adv_group_name#13, account_id#14,
> account_name#15, cost#16, showcnt#17, clickcnt#18, clickrate#19,
> currentdate#20, user_type#6, parent_name#7
> (9) Execute InsertIntoHadoopFsRelationCommand
> Input: []
> Arguments:
> hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx_insert, false,
> ORC, [serialization.format=1,
> __hive_compatible_bucketed_table_insertion__=true], Append,
> `spark_catalog`.`database_xxx`.`table_xxx_insert`,
> org.apache.hadoop.hive.ql.io.orc.OrcSerde,
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex(hdfs://offline/data/hive/warehouse/database_xxx.db/table_xxx_insert),
> [adv_group_id, adv_group_name, account_id, account_name, cost, showcnt,
> clickcnt, clickrate, currentdate, user_type, parent_name] {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]