This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 0b5266b [SPARK-32767][SQL][3.0] Bucket join should work if spark.sql.shuffle.partitions larger than bucket number 0b5266b is described below commit 0b5266b425c8a2b6539b845df878fd992a17a8c2 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Fri Sep 4 08:01:04 2020 +0800 [SPARK-32767][SQL][3.0] Bucket join should work if spark.sql.shuffle.partitions larger than bucket number This backports #29612 to branch-3.0. Original PR description: ### What changes were proposed in this pull request? Bucket join should work if `spark.sql.shuffle.partitions` larger than bucket number, such as: ```scala spark.range(1000).write.bucketBy(432, "id").saveAsTable("t1") spark.range(1000).write.bucketBy(34, "id").saveAsTable("t2") sql("set spark.sql.shuffle.partitions=600") sql("set spark.sql.autoBroadcastJoinThreshold=-1") sql("select * from t1 join t2 on t1.id = t2.id").explain() ``` Before this pr: ``` == Physical Plan == *(5) SortMergeJoin [id#26L], [id#27L], Inner :- *(2) Sort [id#26L ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id#26L, 600), true : +- *(1) Filter isnotnull(id#26L) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432 +- *(4) Sort [id#27L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#27L, 600), true +- *(3) Filter isnotnull(id#27L) +- *(3) ColumnarToRow +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34 ``` After this pr: ``` == Physical Plan == *(4) SortMergeJoin [id#26L], [id#27L], Inner :- *(1) Sort [id#26L ASC NULLS FIRST], false, 0 : +- *(1) Filter isnotnull(id#26L) : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[id#26L] Batched: true, DataFilters: [isnotnull(id#26L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 432 out of 432 +- *(3) Sort [id#27L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id#27L, 432), true +- *(2) Filter isnotnull(id#27L) +- *(2) ColumnarToRow +- FileScan parquet default.t2[id#27L] Batched: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 34 out of 34 ``` ### Why are the changes needed? Spark 2.4 support this. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #29624 from wangyum/SPARK-32767-3.0. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../execution/exchange/EnsureRequirements.scala | 15 ++++++++---- .../spark/sql/sources/BucketedReadSuite.scala | 28 ++++++++++++++++++++++ 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 3242ac2..c242320 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -85,11 +85,16 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { childrenIndexes.map(children).filterNot(_.isInstanceOf[ShuffleExchangeExec]) .map(_.outputPartitioning.numPartitions) val expectedChildrenNumPartitions = if (nonShuffleChildrenNumPartitions.nonEmpty) { - // Here we pick the max number of partitions among these non-shuffle children as the - // expected number of shuffle partitions. However, if it's smaller than - // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the - // expected number of shuffle partitions. - math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions) + if (nonShuffleChildrenNumPartitions.length == childrenIndexes.length) { + // Here we pick the max number of partitions among these non-shuffle children. + nonShuffleChildrenNumPartitions.max + } else { + // Here we pick the max number of partitions among these non-shuffle children as the + // expected number of shuffle partitions. However, if it's smaller than + // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the + // expected number of shuffle partitions. + math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions) + } } else { childrenNumPartitions.max } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 14ba008..558bfd7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -843,4 +843,32 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } } + + test("SPARK-32767 Bucket join should work if SHUFFLE_PARTITIONS larger than bucket number") { + withSQLConf( + SQLConf.SHUFFLE_PARTITIONS.key -> "9", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10") { + + val testSpec1 = BucketedTableTestSpec( + Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = false, + expectedSort = false) + val testSpec2 = BucketedTableTestSpec( + Some(BucketSpec(6, Seq("i", "j"), Seq("i", "j"))), + numPartitions = 1, + expectedShuffle = true, + expectedSort = true) + Seq(false, true).foreach { enableAdaptive => + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$enableAdaptive") { + Seq((testSpec1, testSpec2), (testSpec2, testSpec1)).foreach { specs => + testBucketing( + bucketedTableTestSpecLeft = specs._1, + bucketedTableTestSpecRight = specs._2, + joinCondition = joinCondition(Seq("i", "j"))) + } + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org