This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 3f23529 [SPARK-31070][SQL] make skew join split skewed partitions more evenly 3f23529 is described below commit 3f23529cac3a306afe0ed175b8034d4f24b08acb Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Mar 10 21:50:44 2020 -0700 [SPARK-31070][SQL] make skew join split skewed partitions more evenly <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> There are two problems when splitting skewed partitions: 1. It's impossible that we can't split the skewed partition, then we shouldn't create a skew join. 2. When splitting, it's possible that we create a partition for very small amount of data.. This PR fixes them 1. don't create `PartialReducerPartitionSpec` if we can't split. 2. merge small partitions to the previous partition. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> make skew join split skewed partitions more evenly ### Does this PR introduce any user-facing change? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If no, write 'No'. --> no ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. --> updated test Closes #27833 from cloud-fan/aqe. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit d5f5720efa7232f1339976462d462a7360978ab5) Signed-off-by: gatorsmile <gatorsm...@gmail.com> --- .../adaptive/CoalesceShufflePartitions.scala | 2 +- .../execution/adaptive/OptimizeSkewedJoin.scala | 44 +++++++------------ ...Coalescer.scala => ShufflePartitionsUtil.scala} | 50 +++++++++++++++++++++- ...uite.scala => ShufflePartitionsUtilSuite.scala} | 32 ++++++++++++-- .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++--- 5 files changed, 102 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index a8e2d8e..d779a20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -66,7 +66,7 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { - val partitionSpecs = ShufflePartitionsCoalescer.coalescePartitions( + val partitionSpecs = ShufflePartitionsUtil.coalescePartitions( validMetrics.toArray, firstPartitionIndex = 0, lastPartitionIndex = distinctNumPreShufflePartitions.head, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala index 4387409..7f52393 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.adaptive import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.commons.io.FileUtils @@ -111,22 +110,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { targetSize: Long): Seq[Int] = { val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) - val partitionStartIndices = ArrayBuffer[Int]() - partitionStartIndices += 0 - var i = 0 - var postMapPartitionSize = 0L - while (i < mapPartitionSizes.length) { - val nextMapPartitionSize = mapPartitionSizes(i) - if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) { - partitionStartIndices += i - postMapPartitionSize = nextMapPartitionSize - } else { - postMapPartitionSize += nextMapPartitionSize - } - i += 1 - } - - partitionStartIndices + ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, targetSize) } private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = { @@ -211,21 +195,25 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { } val leftParts = if (isLeftSkew) { - leftSkewDesc.addPartitionSize(leftSize) - createSkewPartitions( - partitionIndex, - getMapStartIndices(left, partitionIndex, leftTargetSize), - getNumMappers(left)) + val mapStartIndices = getMapStartIndices(left, partitionIndex, leftTargetSize) + if (mapStartIndices.length > 1) { + leftSkewDesc.addPartitionSize(leftSize) + createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(left)) + } else { + Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) + } } else { Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) } val rightParts = if (isRightSkew) { - rightSkewDesc.addPartitionSize(rightSize) - createSkewPartitions( - partitionIndex, - getMapStartIndices(right, partitionIndex, rightTargetSize), - getNumMappers(right)) + val mapStartIndices = getMapStartIndices(right, partitionIndex, rightTargetSize) + if (mapStartIndices.length > 1) { + rightSkewDesc.addPartitionSize(rightSize) + createSkewPartitions(partitionIndex, mapStartIndices, getNumMappers(right)) + } else { + Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) + } } else { Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1)) } @@ -273,7 +261,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) { nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1)) } else { - ShufflePartitionsCoalescer.coalescePartitions( + ShufflePartitionsUtil.coalescePartitions( Array(leftStats, rightStats), firstPartitionIndex = nonSkewPartitionIndices.head, // `lastPartitionIndex` is exclusive. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala similarity index 73% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala index 8c58241..32f5fd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala @@ -23,7 +23,9 @@ import org.apache.spark.MapOutputStatistics import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.{CoalescedPartitionSpec, ShufflePartitionSpec} -object ShufflePartitionsCoalescer extends Logging { +object ShufflePartitionsUtil extends Logging { + final val SMALL_PARTITION_FACTOR = 0.2 + final val MERGED_PARTITION_FACTOR = 1.2 /** * Coalesce the same range of partitions (`firstPartitionIndex` to `lastPartitionIndex`, the @@ -114,4 +116,50 @@ object ShufflePartitionsCoalescer extends Logging { partitionSpecs } + + /** + * Given a list of size, return an array of indices to split the list into multiple partitions, + * so that the size sum of each partition is close to the target size. Each index indicates the + * start of a partition. + */ + def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): Array[Int] = { + val partitionStartIndices = ArrayBuffer[Int]() + partitionStartIndices += 0 + var i = 0 + var currentPartitionSize = 0L + var lastPartitionSize = -1L + + def tryMergePartitions() = { + // When we are going to start a new partition, it's possible that the current partition or + // the previous partition is very small and it's better to merge the current partition into + // the previous partition. + val shouldMergePartitions = lastPartitionSize > -1 && + ((currentPartitionSize + lastPartitionSize) < targetSize * MERGED_PARTITION_FACTOR || + (currentPartitionSize < targetSize * SMALL_PARTITION_FACTOR || + lastPartitionSize < targetSize * SMALL_PARTITION_FACTOR)) + if (shouldMergePartitions) { + // We decide to merge the current partition into the previous one, so the start index of + // the current partition should be removed. + partitionStartIndices.remove(partitionStartIndices.length - 1) + lastPartitionSize += currentPartitionSize + } else { + lastPartitionSize = currentPartitionSize + } + } + + while (i < sizes.length) { + // If including the next size in the current partition exceeds the target size, package the + // current partition and start a new partition. + if (i > 0 && currentPartitionSize + sizes(i) > targetSize) { + tryMergePartitions() + partitionStartIndices += i + currentPartitionSize = sizes(i) + } else { + currentPartitionSize += sizes(i) + } + i += 1 + } + tryMergePartitions() + partitionStartIndices.toArray + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala similarity index 88% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala index 8aab299..2cd4c98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala @@ -18,9 +18,9 @@ package org.apache.spark.sql.execution import org.apache.spark.{MapOutputStatistics, SparkFunSuite} -import org.apache.spark.sql.execution.adaptive.ShufflePartitionsCoalescer +import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil -class ShufflePartitionsCoalescerSuite extends SparkFunSuite { +class ShufflePartitionsUtilSuite extends SparkFunSuite { private def checkEstimation( bytesByPartitionIdArray: Array[Array[Long]], @@ -31,7 +31,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite { case (bytesByPartitionId, index) => new MapOutputStatistics(index, bytesByPartitionId) } - val estimatedPartitionStartIndices = ShufflePartitionsCoalescer.coalescePartitions( + val estimatedPartitionStartIndices = ShufflePartitionsUtil.coalescePartitions( mapOutputStatistics, 0, bytesByPartitionIdArray.head.length, @@ -252,4 +252,30 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite { targetSize, minNumPartitions) } } + + test("splitSizeListByTargetSize") { + val targetSize = 100 + + // merge the small partitions at the beginning/end + val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15) + assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList1, targetSize).toSeq == + Seq(0, 2, 5)) + + // merge the small partitions in the middle + val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30) + assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList2, targetSize).toSeq == + Seq(0, 2, 4, 5)) + + // merge small partitions if the partition itself is smaller than + // targetSize * SMALL_PARTITION_FACTOR + val sizeList3 = Seq[Long](15, 1000, 15, 1000) + assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList3, targetSize).toSeq == + Seq(0, 3)) + + // merge small partitions if the combined size is smaller than + // targetSize * MERGED_PARTITION_FACTOR + val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 25, 35) + assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, targetSize).toSeq == + Seq(0, 2, 3)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 94947a8..fcca23d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -645,11 +645,11 @@ class AdaptiveQueryExecSuite // into 2 splits and right side is divided into 4 splits, so // 2 x 4 sub-partitions. // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. - // Partition 4: only left side is skewed, and divide into 3 splits, so - // 3 sub-partitions. + // Partition 4: only left side is skewed, and divide into 2 splits, so + // 2 sub-partitions. // So total (8 + 1 + 3) partitions. val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan) - checkSkewJoin(innerSmj, 8 + 1 + 3) + checkSkewJoin(innerSmj, 8 + 1 + 2) // skewed left outer join optimization val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult( @@ -659,11 +659,11 @@ class AdaptiveQueryExecSuite // Partition 0: both left and right sides are skewed, but left join can't split right side, // so only left side is divided into 2 splits, and thus 2 sub-partitions. // Partition 1, 2, 3: not skewed, and coalesced into 1 partition. - // Partition 4: only left side is skewed, and divide into 3 splits, so - // 3 sub-partitions. - // So total (2 + 1 + 3) partitions. + // Partition 4: only left side is skewed, and divide into 2 splits, so + // 2 sub-partitions. + // So total (2 + 1 + 2) partitions. val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan) - checkSkewJoin(leftSmj, 2 + 1 + 3) + checkSkewJoin(leftSmj, 2 + 1 + 2) // skewed right outer join optimization val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org