This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 157de2f6292 [SPARK-43107][SQL] Coalesce buckets in join applied on broadcast join stream side 157de2f6292 is described below commit 157de2f6292353dc1f55e523f36b2d5d07e93389 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Sat Apr 15 09:07:01 2023 +0800 [SPARK-43107][SQL] Coalesce buckets in join applied on broadcast join stream side ### What changes were proposed in this pull request? This PR adds support coalesce buckets in join applied on broadcast join stream side. ### Why are the changes needed? Reduce shuffle to improve query performance. Before | After -- | -- <img src="https://user-images.githubusercontent.com/5399861/231473104-4d9bbe4e-9cfe-4473-ba17-75d09f2eb1b9.png" width="400" height="630"> | <img src="https://user-images.githubusercontent.com/5399861/231470141-bd15031f-facb-4c59-84f2-e60f054257d6.png" width="400" height="630"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #40756 from wangyum/SPARK-43107. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../bucketing/CoalesceBucketsInJoin.scala | 14 +++++++----- .../spark/sql/sources/BucketedReadSuite.scala | 26 +++++++++++++++++++++- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index 7a31e343766..bd79aca8e64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} -import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec} /** * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin` @@ -42,7 +42,7 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] { plan: SparkPlan, numCoalescedBuckets: Int): SparkPlan = { plan transformUp { - case f: FileSourceScanExec => + case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty => f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) } } @@ -115,7 +115,11 @@ object ExtractJoinWithBuckets { private def hasScanOperation(plan: SparkPlan): Boolean = plan match { case f: FilterExec => hasScanOperation(f.child) case p: ProjectExec => hasScanOperation(p.child) - case _: FileSourceScanExec => true + case j: BroadcastHashJoinExec => + if (j.buildSide == BuildLeft) hasScanOperation(j.right) else hasScanOperation(j.left) + case j: BroadcastNestedLoopJoinExec => + if (j.buildSide == BuildLeft) hasScanOperation(j.right) else hasScanOperation(j.left) + case f: FileSourceScanExec => f.relation.bucketSpec.nonEmpty case _ => false } @@ -142,9 +146,7 @@ object ExtractJoinWithBuckets { } private def isApplicable(j: ShuffledJoin): Boolean = { - (j.isInstanceOf[SortMergeJoinExec] || - j.isInstanceOf[ShuffledHashJoinExec]) && - hasScanOperation(j.left) && + hasScanOperation(j.left) && hasScanOperation(j.right) && satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) && satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index a18c681e0fe..0a3824e0298 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -1011,9 +1011,10 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti } test("bucket coalescing is applied when join expressions match with partitioning expressions") { - withTable("t1", "t2") { + withTable("t1", "t2", "t3") { df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1") df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2") + df2.write.format("parquet").saveAsTable("t3") withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", @@ -1050,6 +1051,29 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti Some(4)) // Coalescing is not applied when join expressions do not match with bucket columns. verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None) + // Coalescing applied on broadcast join stream side. + verify( + """ + |SELECT * + |FROM (SELECT /*+ BROADCAST(t3) */ t1.i, t1.j + | FROM t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t + | LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j + |""".stripMargin, 0, Some(4)) + verify( + """ + |SELECT * + |FROM (SELECT /*+ BROADCAST(t3) */ t1.i, t1.j + | FROM t1 JOIN t3 ON t1.i > t3.i AND t1.j < t3.j) t + | JOIN t2 ON t.i = t2.i AND t.j = t2.j + |""".stripMargin, 0, Some(4)) + // Coalescing is not applied on broadcast join build side. + verify( + """ + |SELECT * + |FROM (SELECT /*+ BROADCAST(t1) */ t1.i, t1.j + | FROM t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t + | LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j + |""".stripMargin, 2, None) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org