[ https://issues.apache.org/jira/browse/SPARK-40278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yang Jie updated SPARK-40278: ----------------------------- Description: I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows: {code:java} val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T" val databaseName = "tpcds_database" val scaleFactor = "3072" val format = "parquet" import com.databricks.spark.sql.perf.tpcds.TPCDSTables val tables = new TPCDSTables( spark.sqlContext,dsdgenDir = "./tpcds-kit/tools", scaleFactor = scaleFactor, useDoubleForDecimal = false,useStringForDate = false) spark.sql(s"create database $databaseName") tables.createTemporaryTables(rootDir, format) spark.sql(s"use $databaseName")// TPCDS 24a or 24b val result = spark.sql(""" with ssales as (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid from store_sales, store_returns, store, item, customer, customer_address where ss_ticket_number = sr_ticket_number and ss_item_sk = sr_item_sk and ss_customer_sk = c_customer_sk and ss_item_sk = i_item_sk and ss_store_sk = s_store_sk and c_birth_country = upper(ca_country) and s_zip = ca_zip and s_market_id = 8 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size) select c_last_name, c_first_name, s_store_name, sum(netpaid) paid from ssales where i_color = 'pale' group by c_last_name, c_first_name, s_store_name having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect() sc.stop() {code} The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows: !image-2022-08-30-21-09-48-763.png! !image-2022-08-30-21-10-24-862.png! !image-2022-08-30-21-10-57-128.png! The DAG corresponding to sql is as follows: !image-2022-08-30-21-11-50-895.png! The details as follows: {code:java} == Physical Plan == AdaptiveSparkPlan (42) +- == Final Plan == LocalTableScan (1) +- == Initial Plan == Filter (41) +- HashAggregate (40) +- Exchange (39) +- HashAggregate (38) +- HashAggregate (37) +- Exchange (36) +- HashAggregate (35) +- Project (34) +- BroadcastHashJoin Inner BuildRight (33) :- Project (29) : +- BroadcastHashJoin Inner BuildRight (28) : :- Project (24) : : +- BroadcastHashJoin Inner BuildRight (23) : : :- Project (19) : : : +- BroadcastHashJoin Inner BuildRight (18) : : : :- Project (13) : : : : +- SortMergeJoin Inner (12) : : : : :- Sort (6) : : : : : +- Exchange (5) : : : : : +- Project (4) : : : : : +- Filter (3) : : : : : +- Scan parquet (2) : : : : +- Sort (11) : : : : +- Exchange (10) : : : : +- Project (9) : : : : +- Filter (8) : : : : +- Scan parquet (7) : : : +- BroadcastExchange (17) : : : +- Project (16) : : : +- Filter (15) : : : +- Scan parquet (14) : : +- BroadcastExchange (22) : : +- Filter (21) : : +- Scan parquet (20) : +- BroadcastExchange (27) : +- Filter (26) : +- Scan parquet (25) +- BroadcastExchange (32) +- Filter (31) +- Scan parquet (30) (1) LocalTableScan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] (2) Scan parquet Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales] PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)] ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)> (3) Filter Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132)) (4) Project Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] (5) Exchange Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309] (6) Sort Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0 (7) Scan parquet Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns] PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)] ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint> (8) Filter Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177)) (9) Project Output [2]: [sr_item_sk#177, sr_ticket_number#184L] Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] (10) Exchange Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310] (11) Sort Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0 (12) SortMergeJoin Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131] Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177] Join condition: None (13) Project Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149] Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L] (14) Scan parquet Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store] PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)] ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string> (15) Filter Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689)) (16) Project Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] (17) BroadcastExchange Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316] (18) BroadcastHashJoin Left keys [1]: [ss_store_sk#136] Right keys [1]: [s_store_sk#664] Join condition: None (19) Project Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689] Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] (20) Scan parquet Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item] PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)] ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int> (21) Filter Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564)) (22) BroadcastExchange Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320] (23) BroadcastHashJoin Left keys [1]: [ss_item_sk#131] Right keys [1]: [i_item_sk#564] Join condition: None (24) Project Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] (25) Scan parquet Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)] ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string> (26) Filter Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426)) (27) BroadcastExchange Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324] (28) BroadcastHashJoin Left keys [1]: [ss_customer_sk#132] Right keys [1]: [c_customer_sk#412] Join condition: None (29) Project Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426] Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] (30) Scan parquet Output [3]: [ca_state#456, ca_zip#457, ca_country#458] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address] PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)] ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string> (31) Filter Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457)) (32) BroadcastExchange Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328] (33) BroadcastHashJoin Left keys [2]: [c_birth_country#426, s_zip#689] Right keys [2]: [upper(ca_country#458), ca_zip#457] Join condition: None (34) Project Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458] (35) HashAggregate Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum#870L] Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] (36) Exchange Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333] (37) HashAggregate Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852] (38) HashAggregate Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [partial_sum(netpaid#852)] Aggregate Attributes [2]: [sum#866, isEmpty#867] Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] (39) Exchange Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337] (40) HashAggregate Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [sum(netpaid#852)] Aggregate Attributes [1]: [sum(netpaid#852)#854] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850] (41) Filter Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8)))) (42) AdaptiveSparkPlan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: isFinalPlan=true {code} And I manually revert SPARK-35442, the problem no longer exists. The DAG corresponding to sql is as follows: The details as follows: was: I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the test code as follows: ```scala val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T" val databaseName = "tpcds_database" val scaleFactor = "3072" val format = "parquet" import com.databricks.spark.sql.perf.tpcds.TPCDSTables val tables = new TPCDSTables( spark.sqlContext,dsdgenDir = "./tpcds-kit/tools", scaleFactor = scaleFactor, useDoubleForDecimal = false,useStringForDate = false) spark.sql(s"create database $databaseName") tables.createTemporaryTables(rootDir, format) spark.sql(s"use $databaseName") // TPCDS 24a or 24b val result = spark.sql(""" with ssales as (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid from store_sales, store_returns, store, item, customer, customer_address where ss_ticket_number = sr_ticket_number and ss_item_sk = sr_item_sk and ss_customer_sk = c_customer_sk and ss_item_sk = i_item_sk and ss_store_sk = s_store_sk and c_birth_country = upper(ca_country) and s_zip = ca_zip and s_market_id = 8 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, i_current_price, i_manager_id, i_units, i_size) select c_last_name, c_first_name, s_store_name, sum(netpaid) paid from ssales where i_color = 'pale' group by c_last_name, c_first_name, s_store_name having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect() sc.stop() ``` The above test may failed due to `Stage cancelled because SparkContext was shut down` of stage 31 and stage 36 when AQE enabled as follows: !image-2022-08-30-21-09-48-763.png! !image-2022-08-30-21-10-24-862.png! !image-2022-08-30-21-10-57-128.png! The DAG corresponding to sql is as follows: !image-2022-08-30-21-11-50-895.png! The details as follows: {code:java} == Physical Plan == AdaptiveSparkPlan (42) +- == Final Plan == LocalTableScan (1) +- == Initial Plan == Filter (41) +- HashAggregate (40) +- Exchange (39) +- HashAggregate (38) +- HashAggregate (37) +- Exchange (36) +- HashAggregate (35) +- Project (34) +- BroadcastHashJoin Inner BuildRight (33) :- Project (29) : +- BroadcastHashJoin Inner BuildRight (28) : :- Project (24) : : +- BroadcastHashJoin Inner BuildRight (23) : : :- Project (19) : : : +- BroadcastHashJoin Inner BuildRight (18) : : : :- Project (13) : : : : +- SortMergeJoin Inner (12) : : : : :- Sort (6) : : : : : +- Exchange (5) : : : : : +- Project (4) : : : : : +- Filter (3) : : : : : +- Scan parquet (2) : : : : +- Sort (11) : : : : +- Exchange (10) : : : : +- Project (9) : : : : +- Filter (8) : : : : +- Scan parquet (7) : : : +- BroadcastExchange (17) : : : +- Project (16) : : : +- Filter (15) : : : +- Scan parquet (14) : : +- BroadcastExchange (22) : : +- Filter (21) : : +- Scan parquet (20) : +- BroadcastExchange (27) : +- Filter (26) : +- Scan parquet (25) +- BroadcastExchange (32) +- Filter (31) +- Scan parquet (30) (1) LocalTableScan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] (2) Scan parquet Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales] PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)] ReadSchema: struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)> (3) Filter Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132)) (4) Project Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] (5) Exchange Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), ENSURE_REQUIREMENTS, [id=#309] (6) Sort Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149] Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS FIRST], false, 0 (7) Scan parquet Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns] PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)] ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint> (8) Filter Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177)) (9) Project Output [2]: [sr_item_sk#177, sr_ticket_number#184L] Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] (10) Exchange Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), ENSURE_REQUIREMENTS, [id=#310] (11) Sort Input [2]: [sr_item_sk#177, sr_ticket_number#184L] Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS FIRST], false, 0 (12) SortMergeJoin Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131] Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177] Join condition: None (13) Project Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149] Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L] (14) Scan parquet Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store] PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), IsNotNull(s_store_sk), IsNotNull(s_zip)] ReadSchema: struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string> (15) Filter Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689)) (16) Project Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, s_zip#689] (17) BroadcastExchange Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#316] (18) BroadcastHashJoin Left keys [1]: [ss_store_sk#136] Right keys [1]: [s_store_sk#664] Join condition: None (19) Project Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689] Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] (20) Scan parquet Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item] PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)] ReadSchema: struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int> (21) Filter Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND isnotnull(i_item_sk#564)) (22) BroadcastExchange Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#320] (23) BroadcastHashJoin Left keys [1]: [ss_item_sk#131] Right keys [1]: [i_item_sk#564] Join condition: None (24) Project Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584] (25) Scan parquet Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer] PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)] ReadSchema: struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string> (26) Filter Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426)) (27) BroadcastExchange Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#324] (28) BroadcastHashJoin Left keys [1]: [ss_customer_sk#132] Right keys [1]: [c_customer_sk#412] Join condition: None (29) Project Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426] Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, c_last_name#421, c_birth_country#426] (30) Scan parquet Output [3]: [ca_state#456, ca_zip#457, ca_country#458] Batched: true Location: InMemoryFileIndex [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address] PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)] ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string> (31) Filter Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457)) (32) BroadcastExchange Input [3]: [ca_state#456, ca_zip#457, ca_country#458] Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), input[1, string, false]),false), [id=#328] (33) BroadcastHashJoin Left keys [2]: [c_birth_country#426, s_zip#689] Right keys [2]: [upper(ca_country#458), ca_zip#457] Join condition: None (34) Project Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, ca_zip#457, ca_country#458] (35) HashAggregate Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, c_first_name#420, c_last_name#421, ca_state#456] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum#870L] Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] (36) Exchange Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333] (37) HashAggregate Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579, sum#871L] Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, i_size#579] Functions [1]: [sum(UnscaledValue(ss_net_paid#149))] Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852] (38) HashAggregate Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [partial_sum(netpaid#852)] Aggregate Attributes [2]: [sum#866, isEmpty#867] Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] (39) Exchange Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Arguments: hashpartitioning(c_last_name#421, c_first_name#420, s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337] (40) HashAggregate Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, isEmpty#869] Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] Functions [1]: [sum(netpaid#852)] Aggregate Attributes [1]: [sum(netpaid#852)#854] Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, sum(netpaid#852)#854 AS paid#850] (41) Filter Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > cast(Subquery subquery#851, [id=#294] as decimal(33,8)))) (42) AdaptiveSparkPlan Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] Arguments: isFinalPlan=true {code} And I manually revert SPARK-35442, the problem no longer exists. The DAG corresponding to sql is as follows: The details as follows: > Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed > -------------------------------------------------------------------------- > > Key: SPARK-40278 > URL: https://issues.apache.org/jira/browse/SPARK-40278 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.3.0 > Reporter: Yang Jie > Priority: Major > > I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, > the test code as follows: > {code:java} > val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T" > val databaseName = "tpcds_database" > val scaleFactor = "3072" > val format = "parquet" > import com.databricks.spark.sql.perf.tpcds.TPCDSTables > val tables = new TPCDSTables( > spark.sqlContext,dsdgenDir = "./tpcds-kit/tools", > scaleFactor = scaleFactor, > useDoubleForDecimal = false,useStringForDate = false) > spark.sql(s"create database $databaseName") > tables.createTemporaryTables(rootDir, format) > spark.sql(s"use $databaseName")// TPCDS 24a or 24b > val result = spark.sql(""" with ssales as > (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, > i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) > netpaid > from store_sales, store_returns, store, item, customer, customer_address > where ss_ticket_number = sr_ticket_number > and ss_item_sk = sr_item_sk > and ss_customer_sk = c_customer_sk > and ss_item_sk = i_item_sk > and ss_store_sk = s_store_sk > and c_birth_country = upper(ca_country) > and s_zip = ca_zip > and s_market_id = 8 > group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color, > i_current_price, i_manager_id, i_units, i_size) > select c_last_name, c_first_name, s_store_name, sum(netpaid) paid > from ssales > where i_color = 'pale' > group by c_last_name, c_first_name, s_store_name > having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect() > sc.stop() {code} > The above test may failed due to `Stage cancelled because SparkContext was > shut down` of stage 31 and stage 36 when AQE enabled as follows: > > !image-2022-08-30-21-09-48-763.png! > !image-2022-08-30-21-10-24-862.png! > !image-2022-08-30-21-10-57-128.png! > > The DAG corresponding to sql is as follows: > !image-2022-08-30-21-11-50-895.png! > The details as follows: > > > {code:java} > == Physical Plan == > AdaptiveSparkPlan (42) > +- == Final Plan == > LocalTableScan (1) > +- == Initial Plan == > Filter (41) > +- HashAggregate (40) > +- Exchange (39) > +- HashAggregate (38) > +- HashAggregate (37) > +- Exchange (36) > +- HashAggregate (35) > +- Project (34) > +- BroadcastHashJoin Inner BuildRight (33) > :- Project (29) > : +- BroadcastHashJoin Inner BuildRight (28) > : :- Project (24) > : : +- BroadcastHashJoin Inner BuildRight (23) > : : :- Project (19) > : : : +- BroadcastHashJoin Inner > BuildRight (18) > : : : :- Project (13) > : : : : +- SortMergeJoin Inner (12) > : : : : :- Sort (6) > : : : : : +- Exchange (5) > : : : : : +- Project (4) > : : : : : +- Filter (3) > : : : : : +- Scan > parquet (2) > : : : : +- Sort (11) > : : : : +- Exchange (10) > : : : : +- Project (9) > : : : : +- Filter (8) > : : : : +- Scan > parquet (7) > : : : +- BroadcastExchange (17) > : : : +- Project (16) > : : : +- Filter (15) > : : : +- Scan parquet (14) > : : +- BroadcastExchange (22) > : : +- Filter (21) > : : +- Scan parquet (20) > : +- BroadcastExchange (27) > : +- Filter (26) > : +- Scan parquet (25) > +- BroadcastExchange (32) > +- Filter (31) > +- Scan parquet (30) > (1) LocalTableScan > Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] > Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, > paid#850] > (2) Scan parquet > Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] > Batched: true > Location: InMemoryFileIndex > [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales] > PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), > IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)] > ReadSchema: > struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)> > (3) Filter > Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] > Condition : (((isnotnull(ss_ticket_number#138L) AND > isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND > isnotnull(ss_customer_sk#132)) > (4) Project > Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149] > Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152] > (5) Exchange > Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149] > Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), > ENSURE_REQUIREMENTS, [id=#309] > (6) Sort > Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149] > Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS > FIRST], false, 0 > (7) Scan parquet > Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] > Batched: true > Location: InMemoryFileIndex > [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns] > PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)] > ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint> > (8) Filter > Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] > Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177)) > (9) Project > Output [2]: [sr_item_sk#177, sr_ticket_number#184L] > Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195] > (10) Exchange > Input [2]: [sr_item_sk#177, sr_ticket_number#184L] > Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), > ENSURE_REQUIREMENTS, [id=#310] > (11) Sort > Input [2]: [sr_item_sk#177, sr_ticket_number#184L] > Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS > FIRST], false, 0 > (12) SortMergeJoin > Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131] > Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177] > Join condition: None > (13) Project > Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_net_paid#149] > Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L] > (14) Scan parquet > Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, > s_zip#689] > Batched: true > Location: InMemoryFileIndex > [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store] > PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), > IsNotNull(s_store_sk), IsNotNull(s_zip)] > ReadSchema: > struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string> > (15) Filter > Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, > s_zip#689] > Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND > isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689)) > (16) Project > Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] > Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, > s_zip#689] > (17) BroadcastExchange > Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] > Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as > bigint)),false), [id=#316] > (18) BroadcastHashJoin > Left keys [1]: [ss_store_sk#136] > Right keys [1]: [s_store_sk#664] > Join condition: None > (19) Project > Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, > s_store_name#669, s_state#688, s_zip#689] > Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, > ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689] > (20) Scan parquet > Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, > i_units#582, i_manager_id#584] > Batched: true > Location: InMemoryFileIndex > [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item] > PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), > IsNotNull(i_item_sk)] > ReadSchema: > struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int> > (21) Filter > Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, > i_units#582, i_manager_id#584] > Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND > isnotnull(i_item_sk#564)) > (22) BroadcastExchange > Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, > i_units#582, i_manager_id#584] > Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as > bigint)),false), [id=#320] > (23) BroadcastHashJoin > Left keys [1]: [ss_item_sk#131] > Right keys [1]: [i_item_sk#564] > Join condition: None > (24) Project > Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, > s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, > i_units#582, i_manager_id#584] > Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, > s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, > i_size#579, i_color#581, i_units#582, i_manager_id#584] > (25) Scan parquet > Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, > c_birth_country#426] > Batched: true > Location: InMemoryFileIndex > [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer] > PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)] > ReadSchema: > struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string> > (26) Filter > Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, > c_birth_country#426] > Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426)) > (27) BroadcastExchange > Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, > c_birth_country#426] > Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as > bigint)),false), [id=#324] > (28) BroadcastHashJoin > Left keys [1]: [ss_customer_sk#132] > Right keys [1]: [c_customer_sk#412] > Join condition: None > (29) Project > Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, > i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, > c_first_name#420, c_last_name#421, c_birth_country#426] > Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, > s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, > i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, > c_last_name#421, c_birth_country#426] > (30) Scan parquet > Output [3]: [ca_state#456, ca_zip#457, ca_country#458] > Batched: true > Location: InMemoryFileIndex > [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address] > PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)] > ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string> > (31) Filter > Input [3]: [ca_state#456, ca_zip#457, ca_country#458] > Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457)) > (32) BroadcastExchange > Input [3]: [ca_state#456, ca_zip#457, ca_country#458] > Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), > input[1, string, false]),false), [id=#328] > (33) BroadcastHashJoin > Left keys [2]: [c_birth_country#426, s_zip#689] > Right keys [2]: [upper(ca_country#458), ca_zip#457] > Join condition: None > (34) Project > Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, > i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, > c_first_name#420, c_last_name#421, ca_state#456] > Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, > i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, > c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, > ca_zip#457, ca_country#458] > (35) HashAggregate > Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, > i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, > c_first_name#420, c_last_name#421, ca_state#456] > Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, > ca_state#456, s_state#688, i_color#581, i_current_price#569, > i_manager_id#584, i_units#582, i_size#579] > Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))] > Aggregate Attributes [1]: [sum#870L] > Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, > ca_state#456, s_state#688, i_color#581, i_current_price#569, > i_manager_id#584, i_units#582, i_size#579, sum#871L] > (36) Exchange > Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, > ca_state#456, s_state#688, i_color#581, i_current_price#569, > i_manager_id#584, i_units#582, i_size#579, sum#871L] > Arguments: hashpartitioning(c_last_name#421, c_first_name#420, > s_store_name#669, ca_state#456, s_state#688, i_color#581, > i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), > ENSURE_REQUIREMENTS, [id=#333] > (37) HashAggregate > Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, > ca_state#456, s_state#688, i_color#581, i_current_price#569, > i_manager_id#584, i_units#582, i_size#579, sum#871L] > Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, > ca_state#456, s_state#688, i_color#581, i_current_price#569, > i_manager_id#584, i_units#582, i_size#579] > Functions [1]: [sum(UnscaledValue(ss_net_paid#149))] > Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L] > Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, > MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852] > (38) HashAggregate > Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852] > Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] > Functions [1]: [partial_sum(netpaid#852)] > Aggregate Attributes [2]: [sum#866, isEmpty#867] > Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, > isEmpty#869] > (39) Exchange > Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, > isEmpty#869] > Arguments: hashpartitioning(c_last_name#421, c_first_name#420, > s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337] > (40) HashAggregate > Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, > isEmpty#869] > Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669] > Functions [1]: [sum(netpaid#852)] > Aggregate Attributes [1]: [sum(netpaid#852)#854] > Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, > sum(netpaid#852)#854 AS paid#850] > (41) Filter > Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] > Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > > cast(Subquery subquery#851, [id=#294] as decimal(33,8)))) > (42) AdaptiveSparkPlan > Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850] > Arguments: isFinalPlan=true {code} > > > And I manually revert SPARK-35442, the problem no longer exists. > > The DAG corresponding to sql is as follows: > The details as follows: -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org