This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dc2fd57  [SPARK-38013][SQL][TEST] AQE can change bhj to smj if no 
extra shuffle introduce
dc2fd57 is described below

commit dc2fd57352a18ebf55c1ffe33898d51f8f408597
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Tue Feb 1 23:12:27 2022 -0800

    [SPARK-38013][SQL][TEST] AQE can change bhj to smj if no extra shuffle 
introduce
    
    ### What changes were proposed in this pull request?
    
    Add a test case in `AdaptiveQueryExecSuite`.
    
    ### Why are the changes needed?
    
    AQE can change bhj to smj, and it requires two conditions:
    - no extra shuffle introduce, otherwise the built-in cost evaluator will 
ban it
    - AQE does not think the join can be planned as broadcast join. That says 
the cost statistics in normal planner is not accurate.
    
    It's counterintuitive, but it's an expected behavior as AQE designed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Pass CI
    
    Closes #35353 from ulysses-you/bhj-smj.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../adaptive/AdaptiveQueryExecSuite.scala          | 23 ++++++++++++++++++++++
 1 file changed, 23 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index de41b88..1bd8ad9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -187,6 +187,29 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("Change broadcast join to merge join") {
+    withTable("t1", "t2") {
+      withSQLConf(
+          SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000",
+          SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+          SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
+        sql("CREATE TABLE t1 USING PARQUET AS SELECT 1 c1")
+        sql("CREATE TABLE t2 USING PARQUET AS SELECT 1 c1")
+        val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+          """
+            |SELECT * FROM (
+            | SELECT distinct c1 from t1
+            | ) tmp1 JOIN (
+            |  SELECT distinct c1 from t2
+            | ) tmp2 ON tmp1.c1 = tmp2.c1
+            |""".stripMargin)
+        assert(findTopLevelBroadcastHashJoin(plan).size == 1)
+        assert(findTopLevelBroadcastHashJoin(adaptivePlan).isEmpty)
+        assert(findTopLevelSortMergeJoin(adaptivePlan).size == 1)
+      }
+    }
+  }
+
   test("Reuse the parallelism of coalesced shuffle in local shuffle read") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to