This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0ddf4fdb99cb [SPARK-43228][SQL] Join keys also match PartitioningCollection in CoalesceBucketsInJoin 0ddf4fdb99cb is described below commit 0ddf4fdb99cbff5c40653c646096ffa845744a83 Author: wankunde <wanku...@163.com> AuthorDate: Sun Dec 3 11:55:59 2023 +0800 [SPARK-43228][SQL] Join keys also match PartitioningCollection in CoalesceBucketsInJoin ### What changes were proposed in this pull request? This PR updates `CoalesceBucketsInJoin.satisfiesOutputPartitioning` to support matching `PartitioningCollection`. A common case is that we add an alias on the join key. For example: ```sql SELECT * FROM (SELECT /*+ BROADCAST(t3) */ t1.i AS t1i, t1.j AS t1j, t3.* FROM t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t JOIN t2 ON t.t1i = t2.i AND t.t1j = t2.j ``` The left side outputPartitioning is: ``` (hashpartitioning(t1i#41, t1j#42, 8) or hashpartitioning(i#46, t1j#42, 8) or hashpartitioning(t1i#41, j#47, 8) or hashpartitioning(i#46, j#47, 8)) ``` ### Why are the changes needed? Enhance `CoalesceBucketsInJoin` to support more cases. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #44128 from wankunde/coalesce_partitions. Authored-by: wankunde <wanku...@163.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala | 4 +++- .../scala/org/apache/spark/sql/sources/BucketedReadSuite.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala index bd79aca8e647..d1464b4ac4ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -22,7 +22,7 @@ import scala.annotation.tailrec import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, SortMergeJoinExec} @@ -141,6 +141,8 @@ object ExtractJoinWithBuckets { partitioning match { case HashPartitioning(exprs, _) if exprs.length == keys.length => exprs.forall(e => keys.exists(_.semanticEquals(e))) + case PartitioningCollection(partitionings) => + partitionings.exists(satisfiesOutputPartitioning(keys, _)) case _ => false } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 898e80df0207..3573bafe482c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -1077,6 +1077,14 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti | FROM t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t | LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j |""".stripMargin, 2, None) + // join keys also match PartitioningCollection + verify( + """ + |SELECT * + |FROM (SELECT /*+ BROADCAST(t3) */ t1.i AS t1i, t1.j AS t1j, t3.* + | FROM t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t + | JOIN t2 ON t.t1i = t2.i AND t.t1j = t2.j + |""".stripMargin, 0, Some(4)) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org