This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ddf4fdb99cb [SPARK-43228][SQL] Join keys also match 
PartitioningCollection in CoalesceBucketsInJoin
0ddf4fdb99cb is described below

commit 0ddf4fdb99cbff5c40653c646096ffa845744a83
Author: wankunde <wanku...@163.com>
AuthorDate: Sun Dec 3 11:55:59 2023 +0800

    [SPARK-43228][SQL] Join keys also match PartitioningCollection in 
CoalesceBucketsInJoin
    
    ### What changes were proposed in this pull request?
    This PR updates `CoalesceBucketsInJoin.satisfiesOutputPartitioning` to 
support matching `PartitioningCollection`. A common case is that we add an 
alias on the join key. For example:
    
    ```sql
    SELECT *
    FROM   (SELECT /*+ BROADCAST(t3) */ t1.i AS t1i, t1.j AS t1j, t3.*
            FROM   t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
           JOIN t2 ON t.t1i = t2.i AND t.t1j = t2.j
    ```
    
    The left side outputPartitioning is:
    
    ```
    (hashpartitioning(t1i#41, t1j#42, 8) or hashpartitioning(i#46, t1j#42, 8) 
or hashpartitioning(t1i#41, j#47, 8) or hashpartitioning(i#46, j#47, 8))
    ```
    
    ### Why are the changes needed?
    Enhance `CoalesceBucketsInJoin` to support more cases.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Unit test.
    
    Closes #44128 from wankunde/coalesce_partitions.
    
    Authored-by: wankunde <wanku...@163.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala     | 4 +++-
 .../scala/org/apache/spark/sql/sources/BucketedReadSuite.scala    | 8 ++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
index bd79aca8e647..d1464b4ac4ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
@@ -22,7 +22,7 @@ import scala.annotation.tailrec
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, PartitioningCollection}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, 
SortMergeJoinExec}
@@ -141,6 +141,8 @@ object ExtractJoinWithBuckets {
     partitioning match {
       case HashPartitioning(exprs, _) if exprs.length == keys.length =>
         exprs.forall(e => keys.exists(_.semanticEquals(e)))
+      case PartitioningCollection(partitionings) =>
+        partitionings.exists(satisfiesOutputPartitioning(keys, _))
       case _ => false
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 898e80df0207..3573bafe482c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -1077,6 +1077,14 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
             |        FROM   t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
             |       LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j
             |""".stripMargin, 2, None)
+        // join keys also match PartitioningCollection
+        verify(
+          """
+            |SELECT *
+            |FROM   (SELECT /*+ BROADCAST(t3) */ t1.i AS t1i, t1.j AS t1j, t3.*
+            |        FROM   t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
+            |       JOIN t2 ON t.t1i = t2.i AND t.t1j = t2.j
+            |""".stripMargin, 0, Some(4))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to