[ 
https://issues.apache.org/jira/browse/SPARK-40278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Jie updated SPARK-40278:
-----------------------------
    Description: 
I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the 
test code as follows:
{code:java}
val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
val databaseName = "tpcds_database"
val scaleFactor = "3072"
val format = "parquet"
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(
      spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
      scaleFactor = scaleFactor,
      useDoubleForDecimal = false,useStringForDate = false)
spark.sql(s"create database $databaseName")
tables.createTemporaryTables(rootDir, format)
spark.sql(s"use $databaseName")// TPCDS 24a or 24b
val result = spark.sql(""" with ssales as
 (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
        i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
 from store_sales, store_returns, store, item, customer, customer_address
 where ss_ticket_number = sr_ticket_number
   and ss_item_sk = sr_item_sk
   and ss_customer_sk = c_customer_sk
   and ss_item_sk = i_item_sk
   and ss_store_sk = s_store_sk
   and c_birth_country = upper(ca_country)
   and s_zip = ca_zip
 and s_market_id = 8
 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
          i_current_price, i_manager_id, i_units, i_size)
 select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
 from ssales
 where i_color = 'pale'
 group by c_last_name, c_first_name, s_store_name
 having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
 sc.stop() {code}
The above test may failed due to `Stage cancelled because SparkContext was shut 
down` of stage 31 and stage 36 when AQE enabled as follows:

 

!image-2022-08-30-21-09-48-763.png!

!image-2022-08-30-21-10-24-862.png!

!image-2022-08-30-21-10-57-128.png!

 

The DAG corresponding to sql is as follows:

!image-2022-08-30-21-11-50-895.png!

The details as follows:

 

 
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (42)
+- == Final Plan ==
   LocalTableScan (1)
+- == Initial Plan ==
   Filter (41)
   +- HashAggregate (40)
      +- Exchange (39)
         +- HashAggregate (38)
            +- HashAggregate (37)
               +- Exchange (36)
                  +- HashAggregate (35)
                     +- Project (34)
                        +- BroadcastHashJoin Inner BuildRight (33)
                           :- Project (29)
                           :  +- BroadcastHashJoin Inner BuildRight (28)
                           :     :- Project (24)
                           :     :  +- BroadcastHashJoin Inner BuildRight (23)
                           :     :     :- Project (19)
                           :     :     :  +- BroadcastHashJoin Inner BuildRight 
(18)
                           :     :     :     :- Project (13)
                           :     :     :     :  +- SortMergeJoin Inner (12)
                           :     :     :     :     :- Sort (6)
                           :     :     :     :     :  +- Exchange (5)
                           :     :     :     :     :     +- Project (4)
                           :     :     :     :     :        +- Filter (3)
                           :     :     :     :     :           +- Scan parquet  
(2)
                           :     :     :     :     +- Sort (11)
                           :     :     :     :        +- Exchange (10)
                           :     :     :     :           +- Project (9)
                           :     :     :     :              +- Filter (8)
                           :     :     :     :                 +- Scan parquet  
(7)
                           :     :     :     +- BroadcastExchange (17)
                           :     :     :        +- Project (16)
                           :     :     :           +- Filter (15)
                           :     :     :              +- Scan parquet  (14)
                           :     :     +- BroadcastExchange (22)
                           :     :        +- Filter (21)
                           :     :           +- Scan parquet  (20)
                           :     +- BroadcastExchange (27)
                           :        +- Filter (26)
                           :           +- Scan parquet  (25)
                           +- BroadcastExchange (32)
                              +- Filter (31)
                                 +- Scan parquet  (30)


(1) LocalTableScan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, 
paid#850]

(2) Scan parquet 
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), 
IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
ReadSchema: 
struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>

(3) Filter
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) 
AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))

(4) Project
Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]

(5) Exchange
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), 
ENSURE_REQUIREMENTS, [id=#309]

(6) Sort
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS 
FIRST], false, 0

(7) Scan parquet 
Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>

(8) Filter
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))

(9) Project
Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]

(10) Exchange
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), 
ENSURE_REQUIREMENTS, [id=#310]

(11) Sort
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS 
FIRST], false, 0

(12) SortMergeJoin
Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
Join condition: None

(13) Project
Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149]
Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]

(14) Scan parquet 
Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), 
IsNotNull(s_store_sk), IsNotNull(s_zip)]
ReadSchema: 
struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>

(15) Filter
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND 
isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))

(16) Project
Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]

(17) BroadcastExchange
Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#316]

(18) BroadcastHashJoin
Left keys [1]: [ss_store_sk#136]
Right keys [1]: [s_store_sk#664]
Join condition: None

(19) Project
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689]
Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]

(20) Scan parquet 
Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
ReadSchema: 
struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>

(21) Filter
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND 
isnotnull(i_item_sk#564))

(22) BroadcastExchange
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#320]

(23) BroadcastHashJoin
Left keys [1]: [ss_item_sk#131]
Right keys [1]: [i_item_sk#564]
Join condition: None

(24) Project
Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, 
i_size#579, i_color#581, i_units#582, i_manager_id#584]

(25) Scan parquet 
Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
ReadSchema: 
struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>

(26) Filter
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))

(27) BroadcastExchange
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#324]

(28) BroadcastHashJoin
Left keys [1]: [ss_customer_sk#132]
Right keys [1]: [c_customer_sk#412]
Join condition: None

(29) Project
Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426]
Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, 
c_last_name#421, c_birth_country#426]

(30) Scan parquet 
Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>

(31) Filter
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))

(32) BroadcastExchange
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), 
input[1, string, false]),false), [id=#328]

(33) BroadcastHashJoin
Left keys [2]: [c_birth_country#426, s_zip#689]
Right keys [2]: [upper(ca_country#458), ca_zip#457]
Join condition: None

(34) Project
Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, 
ca_zip#457, ca_country#458]

(35) HashAggregate
Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579]
Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum#870L]
Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579, sum#871L]

(36) Exchange
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579, sum#871L]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, 
i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]

(37) HashAggregate
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579, sum#871L]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579]
Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]

(38) HashAggregate
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [partial_sum(netpaid#852)]
Aggregate Attributes [2]: [sum#866, isEmpty#867]
Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]

(39) Exchange
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]

(40) HashAggregate
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [sum(netpaid#852)]
Aggregate Attributes [1]: [sum(netpaid#852)#854]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
sum(netpaid#852)#854 AS paid#850]

(41) Filter
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > 
cast(Subquery subquery#851, [id=#294] as decimal(33,8))))

(42) AdaptiveSparkPlan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: isFinalPlan=true {code}
 

 

And I manually revert SPARK-35442, the problem no longer exists.

 

The DAG corresponding to sql is as follows:

The details as follows:

  was:
I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the 
test code as follows:

```scala
val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
val databaseName = "tpcds_database"
val scaleFactor = "3072"
val format = "parquet"
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(
      spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
      scaleFactor = scaleFactor,
      useDoubleForDecimal = false,useStringForDate = false)
spark.sql(s"create database $databaseName")
tables.createTemporaryTables(rootDir, format)
spark.sql(s"use $databaseName")

// TPCDS 24a or 24b
val result = spark.sql(""" with ssales as
 (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
        i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
 from store_sales, store_returns, store, item, customer, customer_address
 where ss_ticket_number = sr_ticket_number
   and ss_item_sk = sr_item_sk
   and ss_customer_sk = c_customer_sk
   and ss_item_sk = i_item_sk
   and ss_store_sk = s_store_sk
   and c_birth_country = upper(ca_country)
   and s_zip = ca_zip
 and s_market_id = 8
 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
          i_current_price, i_manager_id, i_units, i_size)
 select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
 from ssales
 where i_color = 'pale'
 group by c_last_name, c_first_name, s_store_name
 having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
 sc.stop()
```

The above test may failed due to `Stage cancelled because SparkContext was shut 
down` of stage 31 and stage 36 when AQE enabled as follows:

 

!image-2022-08-30-21-09-48-763.png!

!image-2022-08-30-21-10-24-862.png!

!image-2022-08-30-21-10-57-128.png!

 

The DAG corresponding to sql is as follows:

!image-2022-08-30-21-11-50-895.png!

The details as follows:

 

 
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (42)
+- == Final Plan ==
   LocalTableScan (1)
+- == Initial Plan ==
   Filter (41)
   +- HashAggregate (40)
      +- Exchange (39)
         +- HashAggregate (38)
            +- HashAggregate (37)
               +- Exchange (36)
                  +- HashAggregate (35)
                     +- Project (34)
                        +- BroadcastHashJoin Inner BuildRight (33)
                           :- Project (29)
                           :  +- BroadcastHashJoin Inner BuildRight (28)
                           :     :- Project (24)
                           :     :  +- BroadcastHashJoin Inner BuildRight (23)
                           :     :     :- Project (19)
                           :     :     :  +- BroadcastHashJoin Inner BuildRight 
(18)
                           :     :     :     :- Project (13)
                           :     :     :     :  +- SortMergeJoin Inner (12)
                           :     :     :     :     :- Sort (6)
                           :     :     :     :     :  +- Exchange (5)
                           :     :     :     :     :     +- Project (4)
                           :     :     :     :     :        +- Filter (3)
                           :     :     :     :     :           +- Scan parquet  
(2)
                           :     :     :     :     +- Sort (11)
                           :     :     :     :        +- Exchange (10)
                           :     :     :     :           +- Project (9)
                           :     :     :     :              +- Filter (8)
                           :     :     :     :                 +- Scan parquet  
(7)
                           :     :     :     +- BroadcastExchange (17)
                           :     :     :        +- Project (16)
                           :     :     :           +- Filter (15)
                           :     :     :              +- Scan parquet  (14)
                           :     :     +- BroadcastExchange (22)
                           :     :        +- Filter (21)
                           :     :           +- Scan parquet  (20)
                           :     +- BroadcastExchange (27)
                           :        +- Filter (26)
                           :           +- Scan parquet  (25)
                           +- BroadcastExchange (32)
                              +- Filter (31)
                                 +- Scan parquet  (30)


(1) LocalTableScan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, 
paid#850]

(2) Scan parquet 
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), 
IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
ReadSchema: 
struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>

(3) Filter
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) 
AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))

(4) Project
Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]

(5) Exchange
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), 
ENSURE_REQUIREMENTS, [id=#309]

(6) Sort
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS 
FIRST], false, 0

(7) Scan parquet 
Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>

(8) Filter
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))

(9) Project
Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]

(10) Exchange
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), 
ENSURE_REQUIREMENTS, [id=#310]

(11) Sort
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS 
FIRST], false, 0

(12) SortMergeJoin
Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
Join condition: None

(13) Project
Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149]
Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]

(14) Scan parquet 
Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), 
IsNotNull(s_store_sk), IsNotNull(s_zip)]
ReadSchema: 
struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>

(15) Filter
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND 
isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))

(16) Project
Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]

(17) BroadcastExchange
Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#316]

(18) BroadcastHashJoin
Left keys [1]: [ss_store_sk#136]
Right keys [1]: [s_store_sk#664]
Join condition: None

(19) Project
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689]
Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]

(20) Scan parquet 
Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
ReadSchema: 
struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>

(21) Filter
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND 
isnotnull(i_item_sk#564))

(22) BroadcastExchange
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#320]

(23) BroadcastHashJoin
Left keys [1]: [ss_item_sk#131]
Right keys [1]: [i_item_sk#564]
Join condition: None

(24) Project
Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, 
i_size#579, i_color#581, i_units#582, i_manager_id#584]

(25) Scan parquet 
Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
ReadSchema: 
struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>

(26) Filter
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))

(27) BroadcastExchange
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#324]

(28) BroadcastHashJoin
Left keys [1]: [ss_customer_sk#132]
Right keys [1]: [c_customer_sk#412]
Join condition: None

(29) Project
Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426]
Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, 
c_last_name#421, c_birth_country#426]

(30) Scan parquet 
Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>

(31) Filter
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))

(32) BroadcastExchange
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), 
input[1, string, false]),false), [id=#328]

(33) BroadcastHashJoin
Left keys [2]: [c_birth_country#426, s_zip#689]
Right keys [2]: [upper(ca_country#458), ca_zip#457]
Join condition: None

(34) Project
Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, 
ca_zip#457, ca_country#458]

(35) HashAggregate
Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579]
Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum#870L]
Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579, sum#871L]

(36) Exchange
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579, sum#871L]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, 
i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]

(37) HashAggregate
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579, sum#871L]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579]
Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]

(38) HashAggregate
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [partial_sum(netpaid#852)]
Aggregate Attributes [2]: [sum#866, isEmpty#867]
Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]

(39) Exchange
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]

(40) HashAggregate
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [sum(netpaid#852)]
Aggregate Attributes [1]: [sum(netpaid#852)#854]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
sum(netpaid#852)#854 AS paid#850]

(41) Filter
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > 
cast(Subquery subquery#851, [id=#294] as decimal(33,8))))

(42) AdaptiveSparkPlan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: isFinalPlan=true {code}
 

 

And I manually revert SPARK-35442, the problem no longer exists.

 



The DAG corresponding to sql is as follows:

The details as follows:


> Used databricks spark-sql-pref with Spark 3.3 to run 3TB tpcds test failed
> --------------------------------------------------------------------------
>
>                 Key: SPARK-40278
>                 URL: https://issues.apache.org/jira/browse/SPARK-40278
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Yang Jie
>            Priority: Major
>
> I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, 
> the test code as follows:
> {code:java}
> val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
> val databaseName = "tpcds_database"
> val scaleFactor = "3072"
> val format = "parquet"
> import com.databricks.spark.sql.perf.tpcds.TPCDSTables
> val tables = new TPCDSTables(
>       spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
>       scaleFactor = scaleFactor,
>       useDoubleForDecimal = false,useStringForDate = false)
> spark.sql(s"create database $databaseName")
> tables.createTemporaryTables(rootDir, format)
> spark.sql(s"use $databaseName")// TPCDS 24a or 24b
> val result = spark.sql(""" with ssales as
>  (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
>         i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) 
> netpaid
>  from store_sales, store_returns, store, item, customer, customer_address
>  where ss_ticket_number = sr_ticket_number
>    and ss_item_sk = sr_item_sk
>    and ss_customer_sk = c_customer_sk
>    and ss_item_sk = i_item_sk
>    and ss_store_sk = s_store_sk
>    and c_birth_country = upper(ca_country)
>    and s_zip = ca_zip
>  and s_market_id = 8
>  group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
>           i_current_price, i_manager_id, i_units, i_size)
>  select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
>  from ssales
>  where i_color = 'pale'
>  group by c_last_name, c_first_name, s_store_name
>  having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
>  sc.stop() {code}
> The above test may failed due to `Stage cancelled because SparkContext was 
> shut down` of stage 31 and stage 36 when AQE enabled as follows:
>  
> !image-2022-08-30-21-09-48-763.png!
> !image-2022-08-30-21-10-24-862.png!
> !image-2022-08-30-21-10-57-128.png!
>  
> The DAG corresponding to sql is as follows:
> !image-2022-08-30-21-11-50-895.png!
> The details as follows:
>  
>  
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (42)
> +- == Final Plan ==
>    LocalTableScan (1)
> +- == Initial Plan ==
>    Filter (41)
>    +- HashAggregate (40)
>       +- Exchange (39)
>          +- HashAggregate (38)
>             +- HashAggregate (37)
>                +- Exchange (36)
>                   +- HashAggregate (35)
>                      +- Project (34)
>                         +- BroadcastHashJoin Inner BuildRight (33)
>                            :- Project (29)
>                            :  +- BroadcastHashJoin Inner BuildRight (28)
>                            :     :- Project (24)
>                            :     :  +- BroadcastHashJoin Inner BuildRight (23)
>                            :     :     :- Project (19)
>                            :     :     :  +- BroadcastHashJoin Inner 
> BuildRight (18)
>                            :     :     :     :- Project (13)
>                            :     :     :     :  +- SortMergeJoin Inner (12)
>                            :     :     :     :     :- Sort (6)
>                            :     :     :     :     :  +- Exchange (5)
>                            :     :     :     :     :     +- Project (4)
>                            :     :     :     :     :        +- Filter (3)
>                            :     :     :     :     :           +- Scan 
> parquet  (2)
>                            :     :     :     :     +- Sort (11)
>                            :     :     :     :        +- Exchange (10)
>                            :     :     :     :           +- Project (9)
>                            :     :     :     :              +- Filter (8)
>                            :     :     :     :                 +- Scan 
> parquet  (7)
>                            :     :     :     +- BroadcastExchange (17)
>                            :     :     :        +- Project (16)
>                            :     :     :           +- Filter (15)
>                            :     :     :              +- Scan parquet  (14)
>                            :     :     +- BroadcastExchange (22)
>                            :     :        +- Filter (21)
>                            :     :           +- Scan parquet  (20)
>                            :     +- BroadcastExchange (27)
>                            :        +- Filter (26)
>                            :           +- Scan parquet  (25)
>                            +- BroadcastExchange (32)
>                               +- Filter (31)
>                                  +- Scan parquet  (30)
> (1) LocalTableScan
> Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, 
> paid#850]
> (2) Scan parquet 
> Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> Batched: true
> Location: InMemoryFileIndex 
> [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
> PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), 
> IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
> ReadSchema: 
> struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>
> (3) Filter
> Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> Condition : (((isnotnull(ss_ticket_number#138L) AND 
> isnotnull(ss_item_sk#131)) AND isnotnull(ss_store_sk#136)) AND 
> isnotnull(ss_customer_sk#132))
> (4) Project
> Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149]
> Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
> (5) Exchange
> Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149]
> Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), 
> ENSURE_REQUIREMENTS, [id=#309]
> (6) Sort
> Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149]
> Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS 
> FIRST], false, 0
> (7) Scan parquet 
> Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> Batched: true
> Location: InMemoryFileIndex 
> [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
> PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
> ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>
> (8) Filter
> Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))
> (9) Project
> Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
> (10) Exchange
> Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), 
> ENSURE_REQUIREMENTS, [id=#310]
> (11) Sort
> Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
> Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS 
> FIRST], false, 0
> (12) SortMergeJoin
> Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
> Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
> Join condition: None
> (13) Project
> Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_net_paid#149]
> Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]
> (14) Scan parquet 
> Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
> s_zip#689]
> Batched: true
> Location: InMemoryFileIndex 
> [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
> PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), 
> IsNotNull(s_store_sk), IsNotNull(s_zip)]
> ReadSchema: 
> struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>
> (15) Filter
> Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
> s_zip#689]
> Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND 
> isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))
> (16) Project
> Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
> s_zip#689]
> (17) BroadcastExchange
> Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
> bigint)),false), [id=#316]
> (18) BroadcastHashJoin
> Left keys [1]: [ss_store_sk#136]
> Right keys [1]: [s_store_sk#664]
> Join condition: None
> (19) Project
> Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
> s_store_name#669, s_state#688, s_zip#689]
> Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
> ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
> (20) Scan parquet 
> Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
> i_units#582, i_manager_id#584]
> Batched: true
> Location: InMemoryFileIndex 
> [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
> PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), 
> IsNotNull(i_item_sk)]
> ReadSchema: 
> struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>
> (21) Filter
> Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
> i_units#582, i_manager_id#584]
> Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND 
> isnotnull(i_item_sk#564))
> (22) BroadcastExchange
> Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
> i_units#582, i_manager_id#584]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
> bigint)),false), [id=#320]
> (23) BroadcastHashJoin
> Left keys [1]: [ss_item_sk#131]
> Right keys [1]: [i_item_sk#564]
> Join condition: None
> (24) Project
> Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
> s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
> i_units#582, i_manager_id#584]
> Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
> s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, 
> i_size#579, i_color#581, i_units#582, i_manager_id#584]
> (25) Scan parquet 
> Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
> c_birth_country#426]
> Batched: true
> Location: InMemoryFileIndex 
> [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
> PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
> ReadSchema: 
> struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>
> (26) Filter
> Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
> c_birth_country#426]
> Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))
> (27) BroadcastExchange
> Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
> c_birth_country#426]
> Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
> bigint)),false), [id=#324]
> (28) BroadcastHashJoin
> Left keys [1]: [ss_customer_sk#132]
> Right keys [1]: [c_customer_sk#412]
> Join condition: None
> (29) Project
> Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
> i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
> c_first_name#420, c_last_name#421, c_birth_country#426]
> Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
> s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
> i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, 
> c_last_name#421, c_birth_country#426]
> (30) Scan parquet 
> Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Batched: true
> Location: InMemoryFileIndex 
> [afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
> PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
> ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>
> (31) Filter
> Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))
> (32) BroadcastExchange
> Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
> Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), 
> input[1, string, false]),false), [id=#328]
> (33) BroadcastHashJoin
> Left keys [2]: [c_birth_country#426, s_zip#689]
> Right keys [2]: [upper(ca_country#458), ca_zip#457]
> Join condition: None
> (34) Project
> Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
> i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
> c_first_name#420, c_last_name#421, ca_state#456]
> Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
> i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
> c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, 
> ca_zip#457, ca_country#458]
> (35) HashAggregate
> Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
> i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
> c_first_name#420, c_last_name#421, ca_state#456]
> Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> ca_state#456, s_state#688, i_color#581, i_current_price#569, 
> i_manager_id#584, i_units#582, i_size#579]
> Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
> Aggregate Attributes [1]: [sum#870L]
> Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> ca_state#456, s_state#688, i_color#581, i_current_price#569, 
> i_manager_id#584, i_units#582, i_size#579, sum#871L]
> (36) Exchange
> Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> ca_state#456, s_state#688, i_color#581, i_current_price#569, 
> i_manager_id#584, i_units#582, i_size#579, sum#871L]
> Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
> s_store_name#669, ca_state#456, s_state#688, i_color#581, 
> i_current_price#569, i_manager_id#584, i_units#582, i_size#579, 300), 
> ENSURE_REQUIREMENTS, [id=#333]
> (37) HashAggregate
> Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> ca_state#456, s_state#688, i_color#581, i_current_price#569, 
> i_manager_id#584, i_units#582, i_size#579, sum#871L]
> Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> ca_state#456, s_state#688, i_color#581, i_current_price#569, 
> i_manager_id#584, i_units#582, i_size#579]
> Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
> Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
> Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]
> (38) HashAggregate
> Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
> Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
> Functions [1]: [partial_sum(netpaid#852)]
> Aggregate Attributes [2]: [sum#866, isEmpty#867]
> Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
> isEmpty#869]
> (39) Exchange
> Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
> isEmpty#869]
> Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
> s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]
> (40) HashAggregate
> Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
> isEmpty#869]
> Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
> Functions [1]: [sum(netpaid#852)]
> Aggregate Attributes [1]: [sum(netpaid#852)#854]
> Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
> sum(netpaid#852)#854 AS paid#850]
> (41) Filter
> Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > 
> cast(Subquery subquery#851, [id=#294] as decimal(33,8))))
> (42) AdaptiveSparkPlan
> Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
> Arguments: isFinalPlan=true {code}
>  
>  
> And I manually revert SPARK-35442, the problem no longer exists.
>  
> The DAG corresponding to sql is as follows:
> The details as follows:



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to