This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 15885f2  [SPARK-37652][SQL] Add test for optimize skewed join through 
union
15885f2 is described below

commit 15885f2f8aea7905a3ecdf08906fa72355186030
Author: mcdull-zhang <[email protected]>
AuthorDate: Wed Feb 9 21:54:33 2022 +0800

    [SPARK-37652][SQL] Add test for optimize skewed join through union
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/34974, solved most scenarios of data 
skew in union.
    add test for it.
    
    ### Why are the changes needed?
    
    Added tests for the following scenarios:
    
    <b>scenes 1</b>
    ```
    Union
        SMJ
            ShuffleQueryStage
            ShuffleQueryStage
        SMJ
            ShuffleQueryStage
            ShuffleQueryStage
    ```
    
    <b>scenes 2</b>
    ```
    Union
        SMJ
            ShuffleQueryStage
            ShuffleQueryStage
        HashAggregate
    ```
    
    <b>scenes 3: not yet supported, SMJ-3 will introduce a new shuffle, so 
SMJ-1 cannot be optimized</b>
    ```
    Union
        SMJ-1
            ShuffleQueryStage
            ShuffleQueryStage
        SMJ-2
           SMJ-3
             ShuffleQueryStage
             ShuffleQueryStage
           HashAggregate
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass the added test
    
    Closes #34908 from mcdull-zhang/skewed_union.
    
    Authored-by: mcdull-zhang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../adaptive/AdaptiveQueryExecSuite.scala          | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 1bd8ad9..d1c7064 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2441,6 +2441,49 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-37652: optimize skewed join through union") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100") {
+      withTempView("skewData1", "skewData2") {
+        spark
+          .range(0, 1000, 1, 10)
+          .selectExpr("id % 3 as key1", "id as value1")
+          .createOrReplaceTempView("skewData1")
+        spark
+          .range(0, 1000, 1, 10)
+          .selectExpr("id % 1 as key2", "id as value2")
+          .createOrReplaceTempView("skewData2")
+
+        def checkSkewJoin(query: String, joinNums: Int, optimizeSkewJoinNums: 
Int): Unit = {
+          val (_, innerAdaptivePlan) = runAdaptiveAndVerifyResult(query)
+          val joins = findTopLevelSortMergeJoin(innerAdaptivePlan)
+          val optimizeSkewJoins = joins.filter(_.isSkewJoin)
+          assert(joins.size == joinNums && optimizeSkewJoins.size == 
optimizeSkewJoinNums)
+        }
+
+        // skewJoin union skewJoin
+        checkSkewJoin(
+          "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " +
+            "UNION ALL SELECT key2 FROM skewData1 JOIN skewData2 ON key1 = 
key2", 2, 2)
+
+        // skewJoin union aggregate
+        checkSkewJoin(
+          "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " +
+            "UNION ALL SELECT key2 FROM skewData2 GROUP BY key2", 1, 1)
+
+        // skewJoin1 union (skewJoin2 join aggregate)
+        // skewJoin2 will lead to extra shuffles, but skew1 cannot be optimized
+         checkSkewJoin(
+          "SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 UNION ALL 
" +
+            "SELECT key1 from (SELECT key1 FROM skewData1 JOIN skewData2 ON 
key1 = key2) tmp1 " +
+            "JOIN (SELECT key2 FROM skewData2 GROUP BY key2) tmp2 ON key1 = 
key2", 3, 0)
+      }
+    }
+  }
 }
 
 /**

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to