This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dc2fd57 [SPARK-38013][SQL][TEST] AQE can change bhj to smj if no extra shuffle introduce dc2fd57 is described below commit dc2fd57352a18ebf55c1ffe33898d51f8f408597 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Tue Feb 1 23:12:27 2022 -0800 [SPARK-38013][SQL][TEST] AQE can change bhj to smj if no extra shuffle introduce ### What changes were proposed in this pull request? Add a test case in `AdaptiveQueryExecSuite`. ### Why are the changes needed? AQE can change bhj to smj, and it requires two conditions: - no extra shuffle introduce, otherwise the built-in cost evaluator will ban it - AQE does not think the join can be planned as broadcast join. That says the cost statistics in normal planner is not accurate. It's counterintuitive, but it's an expected behavior as AQE designed. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Pass CI Closes #35353 from ulysses-you/bhj-smj. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../adaptive/AdaptiveQueryExecSuite.scala | 23 ++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index de41b88..1bd8ad9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -187,6 +187,29 @@ class AdaptiveQueryExecSuite } } + test("Change broadcast join to merge join") { + withTable("t1", "t2") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SHUFFLE_PARTITIONS.key -> "1") { + sql("CREATE TABLE t1 USING PARQUET AS SELECT 1 c1") + sql("CREATE TABLE t2 USING PARQUET AS SELECT 1 c1") + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + """ + |SELECT * FROM ( + | SELECT distinct c1 from t1 + | ) tmp1 JOIN ( + | SELECT distinct c1 from t2 + | ) tmp2 ON tmp1.c1 = tmp2.c1 + |""".stripMargin) + assert(findTopLevelBroadcastHashJoin(plan).size == 1) + assert(findTopLevelBroadcastHashJoin(adaptivePlan).isEmpty) + assert(findTopLevelSortMergeJoin(adaptivePlan).size == 1) + } + } + } + test("Reuse the parallelism of coalesced shuffle in local shuffle read") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org