This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 0b5266b  [SPARK-32767][SQL][3.0] Bucket join should work if 
spark.sql.shuffle.partitions larger than bucket number
0b5266b is described below

commit 0b5266b425c8a2b6539b845df878fd992a17a8c2
Author: Yuming Wang <yumw...@ebay.com>
AuthorDate: Fri Sep 4 08:01:04 2020 +0800

    [SPARK-32767][SQL][3.0] Bucket join should work if 
spark.sql.shuffle.partitions larger than bucket number
    
    This backports #29612 to branch-3.0. Original PR description:
    
    ### What changes were proposed in this pull request?
    
    Bucket join should work if `spark.sql.shuffle.partitions` larger than 
bucket number, such as:
    ```scala
    spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1")
    spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2")
    sql("set spark.sql.shuffle.partitions=600")
    sql("set spark.sql.autoBroadcastJoinThreshold=-1")
    sql("select * from t1 join t2 on t1.id = t2.id").explain()
    ```
    
    Before this pr:
    ```
    == Physical Plan ==
    *(5) SortMergeJoin [id#26L], [id#27L], Inner
    :- *(2) Sort [id#26L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(id#26L, 600), true
    :     +- *(1) Filter isnotnull(id#26L)
    :        +- *(1) ColumnarToRow
    :           +- FileScan parquet default.t1[id#26L] Batched: true, 
DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], 
PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, 
SelectedBucketsCount: 432 out of 432
    +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(id#27L, 600), true
          +- *(3) Filter isnotnull(id#27L)
             +- *(3) ColumnarToRow
                +- FileScan parquet default.t2[id#27L] Batched: true, 
DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], 
PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, 
SelectedBucketsCount: 34 out of 34
    ```
    
    After this pr:
    ```
    == Physical Plan ==
    *(4) SortMergeJoin [id#26L], [id#27L], Inner
    :- *(1) Sort [id#26L ASC NULLS FIRST], false, 0
    :  +- *(1) Filter isnotnull(id#26L)
    :     +- *(1) ColumnarToRow
    :        +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: 
[isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: 
[IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out 
of 432
    +- *(3) Sort [id#27L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(id#27L, 432), true
          +- *(2) Filter isnotnull(id#27L)
             +- *(2) ColumnarToRow
                +- FileScan parquet default.t2[id#27L] Batched: true, 
DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], 
PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, 
SelectedBucketsCount: 34 out of 34
    ```
    
    ### Why are the changes needed?
    
    Spark 2.4 support this.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #29624 from wangyum/SPARK-32767-3.0.
    
    Authored-by: Yuming Wang <yumw...@ebay.com>
    Signed-off-by: Yuming Wang <yumw...@ebay.com>
---
 .../execution/exchange/EnsureRequirements.scala    | 15 ++++++++----
 .../spark/sql/sources/BucketedReadSuite.scala      | 28 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 3242ac2..c242320 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -85,11 +85,16 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
         
childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec])
           .map(_.outputPartitioning.numPartitions)
       val expectedChildrenNumPartitions = if 
(nonShuffleChildrenNumPartitions.nonEmpty) {
-        // Here we pick the max number of partitions among these non-shuffle 
children as the
-        // expected number of shuffle partitions. However, if it's smaller than
-        // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as 
the
-        // expected number of shuffle partitions.
-        math.max(nonShuffleChildrenNumPartitions.max, 
conf.defaultNumShufflePartitions)
+        if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) {
+          // Here we pick the max number of partitions among these non-shuffle 
children.
+          nonShuffleChildrenNumPartitions.max
+        } else {
+          // Here we pick the max number of partitions among these non-shuffle 
children as the
+          // expected number of shuffle partitions. However, if it's smaller 
than
+          // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` 
as the
+          // expected number of shuffle partitions.
+          math.max(nonShuffleChildrenNumPartitions.max, 
conf.defaultNumShufflePartitions)
+        }
       } else {
         childrenNumPartitions.max
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 14ba008..558bfd7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -843,4 +843,32 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than 
bucket number") {
+    withSQLConf(
+      SQLConf.SHUFFLE_PARTITIONS.key -> "9",
+      SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10")  {
+
+      val testSpec1 = BucketedTableTestSpec(
+        Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))),
+        numPartitions = 1,
+        expectedShuffle = false,
+        expectedSort = false)
+      val testSpec2 = BucketedTableTestSpec(
+        Some(BucketSpec(6, Seq("i", "j"), Seq("i", "j"))),
+        numPartitions = 1,
+        expectedShuffle = true,
+        expectedSort = true)
+      Seq(false, true).foreach { enableAdaptive =>
+        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> 
s"$enableAdaptive") {
+          Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs 
=>
+            testBucketing(
+              bucketedTableTestSpecLeft = specs._1,
+              bucketedTableTestSpecRight = specs._2,
+              joinCondition = joinCondition(Seq("i", "j")))
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to