This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 157de2f6292 [SPARK-43107][SQL] Coalesce buckets in join applied on 
broadcast join stream side
157de2f6292 is described below

commit 157de2f6292353dc1f55e523f36b2d5d07e93389
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Sat Apr 15 09:07:01 2023 +0800

    [SPARK-43107][SQL] Coalesce buckets in join applied on broadcast join 
stream side
    
    ### What changes were proposed in this pull request?
    
    This PR adds support coalesce buckets in join applied on broadcast join 
stream side.
    
    ### Why are the changes needed?
    
    Reduce shuffle to improve query performance.
    
    Before | After
    -- | --
    <img 
src="https://user-images.githubusercontent.com/5399861/231473104-4d9bbe4e-9cfe-4473-ba17-75d09f2eb1b9.png";
 width="400" height="630"> | <img 
src="https://user-images.githubusercontent.com/5399861/231470141-bd15031f-facb-4c59-84f2-e60f054257d6.png";
 width="400" height="630">
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #40756 from wangyum/SPARK-43107.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../bucketing/CoalesceBucketsInJoin.scala          | 14 +++++++-----
 .../spark/sql/sources/BucketedReadSuite.scala      | 26 +++++++++++++++++++++-
 2 files changed, 33 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
index 7a31e343766..bd79aca8e64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, 
BuildRight}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
-import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, 
ShuffledJoin, SortMergeJoinExec}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, 
SortMergeJoinExec}
 
 /**
  * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin`
@@ -42,7 +42,7 @@ object CoalesceBucketsInJoin extends Rule[SparkPlan] {
       plan: SparkPlan,
       numCoalescedBuckets: Int): SparkPlan = {
     plan transformUp {
-      case f: FileSourceScanExec =>
+      case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty =>
         f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets))
     }
   }
@@ -115,7 +115,11 @@ object ExtractJoinWithBuckets {
   private def hasScanOperation(plan: SparkPlan): Boolean = plan match {
     case f: FilterExec => hasScanOperation(f.child)
     case p: ProjectExec => hasScanOperation(p.child)
-    case _: FileSourceScanExec => true
+    case j: BroadcastHashJoinExec =>
+      if (j.buildSide == BuildLeft) hasScanOperation(j.right) else 
hasScanOperation(j.left)
+    case j: BroadcastNestedLoopJoinExec =>
+      if (j.buildSide == BuildLeft) hasScanOperation(j.right) else 
hasScanOperation(j.left)
+    case f: FileSourceScanExec => f.relation.bucketSpec.nonEmpty
     case _ => false
   }
 
@@ -142,9 +146,7 @@ object ExtractJoinWithBuckets {
   }
 
   private def isApplicable(j: ShuffledJoin): Boolean = {
-    (j.isInstanceOf[SortMergeJoinExec] ||
-      j.isInstanceOf[ShuffledHashJoinExec]) &&
-      hasScanOperation(j.left) &&
+    hasScanOperation(j.left) &&
       hasScanOperation(j.right) &&
       satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) &&
       satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index a18c681e0fe..0a3824e0298 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -1011,9 +1011,10 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
   }
 
   test("bucket coalescing is applied when join expressions match with 
partitioning expressions") {
-    withTable("t1", "t2") {
+    withTable("t1", "t2", "t3") {
       df1.write.format("parquet").bucketBy(8, "i", "j").saveAsTable("t1")
       df2.write.format("parquet").bucketBy(4, "i", "j").saveAsTable("t2")
+      df2.write.format("parquet").saveAsTable("t3")
 
       withSQLConf(
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0",
@@ -1050,6 +1051,29 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
           Some(4))
         // Coalescing is not applied when join expressions do not match with 
bucket columns.
         verify("SELECT * FROM t1 JOIN t2 ON t1.i = t2.i", 2, None)
+        // Coalescing applied on broadcast join stream side.
+        verify(
+          """
+            |SELECT *
+            |FROM   (SELECT /*+ BROADCAST(t3) */ t1.i, t1.j
+            |        FROM   t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
+            |       LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j
+            |""".stripMargin, 0, Some(4))
+        verify(
+          """
+            |SELECT *
+            |FROM   (SELECT /*+ BROADCAST(t3) */ t1.i, t1.j
+            |        FROM   t1 JOIN t3 ON t1.i > t3.i AND t1.j < t3.j) t
+            |       JOIN t2 ON t.i = t2.i AND t.j = t2.j
+            |""".stripMargin, 0, Some(4))
+        // Coalescing is not applied on broadcast join build side.
+        verify(
+          """
+            |SELECT *
+            |FROM   (SELECT /*+ BROADCAST(t1) */ t1.i, t1.j
+            |        FROM   t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
+            |       LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j
+            |""".stripMargin, 2, None)
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to