Yang Jie created SPARK-40278:
--------------------------------

             Summary: Used databricks spark-sql-pref with Spark 3.3 to run 3TB 
tpcds test failed
                 Key: SPARK-40278
                 URL: https://issues.apache.org/jira/browse/SPARK-40278
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.3.0, 3.4.0
            Reporter: Yang Jie


I used databricks spark-sql-pref + Spark 3.3 to run 3TB TPCDS q24a or q24b, the 
test code as follows:

```scala
val rootDir = "hdfs://${clusterName}/tpcds-data/POCGenData3T"
val databaseName = "tpcds_database"
val scaleFactor = "3072"
val format = "parquet"
import com.databricks.spark.sql.perf.tpcds.TPCDSTables
val tables = new TPCDSTables(
      spark.sqlContext,dsdgenDir = "./tpcds-kit/tools",
      scaleFactor = scaleFactor,
      useDoubleForDecimal = false,useStringForDate = false)
spark.sql(s"create database $databaseName")
tables.createTemporaryTables(rootDir, format)
spark.sql(s"use $databaseName")

// TPCDS 24a or 24b
val result = spark.sql(""" with ssales as
 (select c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
        i_current_price, i_manager_id, i_units, i_size, sum(ss_net_paid) netpaid
 from store_sales, store_returns, store, item, customer, customer_address
 where ss_ticket_number = sr_ticket_number
   and ss_item_sk = sr_item_sk
   and ss_customer_sk = c_customer_sk
   and ss_item_sk = i_item_sk
   and ss_store_sk = s_store_sk
   and c_birth_country = upper(ca_country)
   and s_zip = ca_zip
 and s_market_id = 8
 group by c_last_name, c_first_name, s_store_name, ca_state, s_state, i_color,
          i_current_price, i_manager_id, i_units, i_size)
 select c_last_name, c_first_name, s_store_name, sum(netpaid) paid
 from ssales
 where i_color = 'pale'
 group by c_last_name, c_first_name, s_store_name
 having sum(netpaid) > (select 0.05*avg(netpaid) from ssales)""").collect()
 sc.stop()
```

The above test may failed due to `Stage cancelled because SparkContext was shut 
down` of stage 31 and stage 36 when AQE enabled as follows:

 

!image-2022-08-30-21-09-48-763.png!

!image-2022-08-30-21-10-24-862.png!

!image-2022-08-30-21-10-57-128.png!

 

The DAG corresponding to sql is as follows:

!image-2022-08-30-21-11-50-895.png!

The details as follows:

 

 
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (42)
+- == Final Plan ==
   LocalTableScan (1)
+- == Initial Plan ==
   Filter (41)
   +- HashAggregate (40)
      +- Exchange (39)
         +- HashAggregate (38)
            +- HashAggregate (37)
               +- Exchange (36)
                  +- HashAggregate (35)
                     +- Project (34)
                        +- BroadcastHashJoin Inner BuildRight (33)
                           :- Project (29)
                           :  +- BroadcastHashJoin Inner BuildRight (28)
                           :     :- Project (24)
                           :     :  +- BroadcastHashJoin Inner BuildRight (23)
                           :     :     :- Project (19)
                           :     :     :  +- BroadcastHashJoin Inner BuildRight 
(18)
                           :     :     :     :- Project (13)
                           :     :     :     :  +- SortMergeJoin Inner (12)
                           :     :     :     :     :- Sort (6)
                           :     :     :     :     :  +- Exchange (5)
                           :     :     :     :     :     +- Project (4)
                           :     :     :     :     :        +- Filter (3)
                           :     :     :     :     :           +- Scan parquet  
(2)
                           :     :     :     :     +- Sort (11)
                           :     :     :     :        +- Exchange (10)
                           :     :     :     :           +- Project (9)
                           :     :     :     :              +- Filter (8)
                           :     :     :     :                 +- Scan parquet  
(7)
                           :     :     :     +- BroadcastExchange (17)
                           :     :     :        +- Project (16)
                           :     :     :           +- Filter (15)
                           :     :     :              +- Scan parquet  (14)
                           :     :     +- BroadcastExchange (22)
                           :     :        +- Filter (21)
                           :     :           +- Scan parquet  (20)
                           :     +- BroadcastExchange (27)
                           :        +- Filter (26)
                           :           +- Scan parquet  (25)
                           +- BroadcastExchange (32)
                              +- Filter (31)
                                 +- Scan parquet  (30)


(1) LocalTableScan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: <empty>, [c_last_name#421, c_first_name#420, s_store_name#669, 
paid#850]

(2) Scan parquet 
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_sales]
PushedFilters: [IsNotNull(ss_ticket_number), IsNotNull(ss_item_sk), 
IsNotNull(ss_store_sk), IsNotNull(ss_customer_sk)]
ReadSchema: 
struct<ss_item_sk:int,ss_customer_sk:int,ss_store_sk:int,ss_ticket_number:bigint,ss_net_paid:decimal(7,2)>

(3) Filter
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]
Condition : (((isnotnull(ss_ticket_number#138L) AND isnotnull(ss_item_sk#131)) 
AND isnotnull(ss_store_sk#136)) AND isnotnull(ss_customer_sk#132))

(4) Project
Output [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Input [6]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, ss_sold_date_sk#152]

(5) Exchange
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Arguments: hashpartitioning(ss_ticket_number#138L, ss_item_sk#131, 300), 
ENSURE_REQUIREMENTS, [id=#309]

(6) Sort
Input [5]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149]
Arguments: [ss_ticket_number#138L ASC NULLS FIRST, ss_item_sk#131 ASC NULLS 
FIRST], false, 0

(7) Scan parquet 
Output [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store_returns]
PushedFilters: [IsNotNull(sr_ticket_number), IsNotNull(sr_item_sk)]
ReadSchema: struct<sr_item_sk:int,sr_ticket_number:bigint>

(8) Filter
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]
Condition : (isnotnull(sr_ticket_number#184L) AND isnotnull(sr_item_sk#177))

(9) Project
Output [2]: [sr_item_sk#177, sr_ticket_number#184L]
Input [3]: [sr_item_sk#177, sr_ticket_number#184L, sr_returned_date_sk#195]

(10) Exchange
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: hashpartitioning(sr_ticket_number#184L, sr_item_sk#177, 300), 
ENSURE_REQUIREMENTS, [id=#310]

(11) Sort
Input [2]: [sr_item_sk#177, sr_ticket_number#184L]
Arguments: [sr_ticket_number#184L ASC NULLS FIRST, sr_item_sk#177 ASC NULLS 
FIRST], false, 0

(12) SortMergeJoin
Left keys [2]: [ss_ticket_number#138L, ss_item_sk#131]
Right keys [2]: [sr_ticket_number#184L, sr_item_sk#177]
Join condition: None

(13) Project
Output [4]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149]
Input [7]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_ticket_number#138L, ss_net_paid#149, sr_item_sk#177, sr_ticket_number#184L]

(14) Scan parquet 
Output [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/store]
PushedFilters: [IsNotNull(s_market_id), EqualTo(s_market_id,8), 
IsNotNull(s_store_sk), IsNotNull(s_zip)]
ReadSchema: 
struct<s_store_sk:int,s_store_name:string,s_market_id:int,s_state:string,s_zip:string>

(15) Filter
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]
Condition : (((isnotnull(s_market_id#674) AND (s_market_id#674 = 8)) AND 
isnotnull(s_store_sk#664)) AND isnotnull(s_zip#689))

(16) Project
Output [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Input [5]: [s_store_sk#664, s_store_name#669, s_market_id#674, s_state#688, 
s_zip#689]

(17) BroadcastExchange
Input [4]: [s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#316]

(18) BroadcastHashJoin
Left keys [1]: [ss_store_sk#136]
Right keys [1]: [s_store_sk#664]
Join condition: None

(19) Project
Output [6]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689]
Input [8]: [ss_item_sk#131, ss_customer_sk#132, ss_store_sk#136, 
ss_net_paid#149, s_store_sk#664, s_store_name#669, s_state#688, s_zip#689]

(20) Scan parquet 
Output [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/item]
PushedFilters: [IsNotNull(i_color), EqualTo(i_color,pale), IsNotNull(i_item_sk)]
ReadSchema: 
struct<i_item_sk:int,i_current_price:decimal(7,2),i_size:string,i_color:string,i_units:string,i_manager_id:int>

(21) Filter
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Condition : ((isnotnull(i_color#581) AND (i_color#581 = pale)) AND 
isnotnull(i_item_sk#564))

(22) BroadcastExchange
Input [6]: [i_item_sk#564, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#320]

(23) BroadcastHashJoin
Left keys [1]: [ss_item_sk#131]
Right keys [1]: [i_item_sk#564]
Join condition: None

(24) Project
Output [10]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584]
Input [12]: [ss_item_sk#131, ss_customer_sk#132, ss_net_paid#149, 
s_store_name#669, s_state#688, s_zip#689, i_item_sk#564, i_current_price#569, 
i_size#579, i_color#581, i_units#582, i_manager_id#584]

(25) Scan parquet 
Output [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer]
PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_birth_country)]
ReadSchema: 
struct<c_customer_sk:int,c_first_name:string,c_last_name:string,c_birth_country:string>

(26) Filter
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Condition : (isnotnull(c_customer_sk#412) AND isnotnull(c_birth_country#426))

(27) BroadcastExchange
Input [4]: [c_customer_sk#412, c_first_name#420, c_last_name#421, 
c_birth_country#426]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#324]

(28) BroadcastHashJoin
Left keys [1]: [ss_customer_sk#132]
Right keys [1]: [c_customer_sk#412]
Join condition: None

(29) Project
Output [12]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426]
Input [14]: [ss_customer_sk#132, ss_net_paid#149, s_store_name#669, 
s_state#688, s_zip#689, i_current_price#569, i_size#579, i_color#581, 
i_units#582, i_manager_id#584, c_customer_sk#412, c_first_name#420, 
c_last_name#421, c_birth_country#426]

(30) Scan parquet 
Output [3]: [ca_state#456, ca_zip#457, ca_country#458]
Batched: true
Location: InMemoryFileIndex 
[afs://tianqi.afs.baidu.com:9902/user/emr_spark_history/tpcds-data/POCGenData3T/customer_address]
PushedFilters: [IsNotNull(ca_country), IsNotNull(ca_zip)]
ReadSchema: struct<ca_state:string,ca_zip:string,ca_country:string>

(31) Filter
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Condition : (isnotnull(ca_country#458) AND isnotnull(ca_zip#457))

(32) BroadcastExchange
Input [3]: [ca_state#456, ca_zip#457, ca_country#458]
Arguments: HashedRelationBroadcastMode(List(upper(input[2, string, false]), 
input[1, string, false]),false), [id=#328]

(33) BroadcastHashJoin
Left keys [2]: [c_birth_country#426, s_zip#689]
Right keys [2]: [upper(ca_country#458), ca_zip#457]
Join condition: None

(34) Project
Output [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
Input [15]: [ss_net_paid#149, s_store_name#669, s_state#688, s_zip#689, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, c_birth_country#426, ca_state#456, 
ca_zip#457, ca_country#458]

(35) HashAggregate
Input [11]: [ss_net_paid#149, s_store_name#669, s_state#688, 
i_current_price#569, i_size#579, i_color#581, i_units#582, i_manager_id#584, 
c_first_name#420, c_last_name#421, ca_state#456]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579]
Functions [1]: [partial_sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum#870L]
Results [11]: [c_last_name#421, c_first_name#420, s_store_name#669, 
ca_state#456, s_state#688, i_color#581, i_current_price#569, i_manager_id#584, 
i_units#582, i_size#579, sum#871L]

(36) Exchange
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579, sum#871L]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, ca_state#456, s_state#688, i_color#581, i_current_price#569, 
i_manager_id#584, i_units#582, i_size#579, 300), ENSURE_REQUIREMENTS, [id=#333]

(37) HashAggregate
Input [11]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579, sum#871L]
Keys [10]: [c_last_name#421, c_first_name#420, s_store_name#669, ca_state#456, 
s_state#688, i_color#581, i_current_price#569, i_manager_id#584, i_units#582, 
i_size#579]
Functions [1]: [sum(UnscaledValue(ss_net_paid#149))]
Aggregate Attributes [1]: [sum(UnscaledValue(ss_net_paid#149))#853L]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
MakeDecimal(sum(UnscaledValue(ss_net_paid#149))#853L,17,2) AS netpaid#852]

(38) HashAggregate
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, netpaid#852]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [partial_sum(netpaid#852)]
Aggregate Attributes [2]: [sum#866, isEmpty#867]
Results [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]

(39) Exchange
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
Arguments: hashpartitioning(c_last_name#421, c_first_name#420, 
s_store_name#669, 300), ENSURE_REQUIREMENTS, [id=#337]

(40) HashAggregate
Input [5]: [c_last_name#421, c_first_name#420, s_store_name#669, sum#868, 
isEmpty#869]
Keys [3]: [c_last_name#421, c_first_name#420, s_store_name#669]
Functions [1]: [sum(netpaid#852)]
Aggregate Attributes [1]: [sum(netpaid#852)#854]
Results [4]: [c_last_name#421, c_first_name#420, s_store_name#669, 
sum(netpaid#852)#854 AS paid#850]

(41) Filter
Input [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Condition : (isnotnull(paid#850) AND (cast(paid#850 as decimal(33,8)) > 
cast(Subquery subquery#851, [id=#294] as decimal(33,8))))

(42) AdaptiveSparkPlan
Output [4]: [c_last_name#421, c_first_name#420, s_store_name#669, paid#850]
Arguments: isFinalPlan=true {code}
 

 

And I manually revert SPARK-35442, the problem no longer exists.

 



The DAG corresponding to sql is as follows:

The details as follows:



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to