This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 3f23529  [SPARK-31070][SQL] make skew join split skewed partitions 
more evenly
3f23529 is described below

commit 3f23529cac3a306afe0ed175b8034d4f24b08acb
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Mar 10 21:50:44 2020 -0700

    [SPARK-31070][SQL] make skew join split skewed partitions more evenly
    
    <!--
    Thanks for sending a pull request!  Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://spark.apache.org/contributing.html
      2. Ensure you have added or run the appropriate tests for your PR: 
https://spark.apache.org/developer-tools.html
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][SPARK-XXXX] Your PR title ...'.
      4. Be sure to keep the PR description updated to reflect all changes.
      5. Please write your PR title to summarize what this PR proposes.
      6. If possible, provide a concise example to reproduce the issue for a 
faster review.
      7. If you want to add a new configuration, please read the guideline 
first for naming configurations in
         
'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
    -->
    
    ### What changes were proposed in this pull request?
    <!--
    Please clarify what changes you are proposing. The purpose of this section 
is to outline the changes and how this PR fixes the issue.
    If possible, please consider writing useful notes for better and faster 
reviews in your PR. See the examples below.
      1. If you refactor some codes with changing classes, showing the class 
hierarchy will help reviewers.
      2. If you fix some SQL features, you can provide some references of other 
DBMSes.
      3. If there is design documentation, please add the link.
      4. If there is a discussion in the mailing list, please add the link.
    -->
    There are two problems when splitting skewed partitions:
    1. It's impossible that we can't split the skewed partition, then we 
shouldn't create a skew join.
    2. When splitting, it's possible that we create a partition for very small 
amount of data..
    
    This PR fixes them
    1. don't create `PartialReducerPartitionSpec` if we can't split.
    2. merge small partitions to the previous partition.
    ### Why are the changes needed?
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you propose a new API, clarify the use case for a new API.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    make skew join split skewed partitions more evenly
    
    ### Does this PR introduce any user-facing change?
    <!--
    If yes, please clarify the previous behavior and the change this PR 
proposes - provide the console output, description and/or an example to show 
the behavior difference if possible.
    If no, write 'No'.
    -->
    no
    
    ### How was this patch tested?
    <!--
    If tests were added, say they were added here. Please make sure to add some 
test cases that check the changes thoroughly including negative and positive 
cases if possible.
    If it was tested in a way different from regular unit tests, please clarify 
how you tested step by step, ideally copy and paste-able, so that other 
reviewers can test and check, and descendants can verify in the future.
    If tests were not added, please describe why they were not added and/or why 
it was difficult to add.
    -->
    updated test
    
    Closes #27833 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
    (cherry picked from commit d5f5720efa7232f1339976462d462a7360978ab5)
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../adaptive/CoalesceShufflePartitions.scala       |  2 +-
 .../execution/adaptive/OptimizeSkewedJoin.scala    | 44 +++++++------------
 ...Coalescer.scala => ShufflePartitionsUtil.scala} | 50 +++++++++++++++++++++-
 ...uite.scala => ShufflePartitionsUtilSuite.scala} | 32 ++++++++++++--
 .../adaptive/AdaptiveQueryExecSuite.scala          | 14 +++---
 5 files changed, 102 insertions(+), 40 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index a8e2d8e..d779a20 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -66,7 +66,7 @@ case class CoalesceShufflePartitions(conf: SQLConf) extends 
Rule[SparkPlan] {
       val distinctNumPreShufflePartitions =
         validMetrics.map(stats => stats.bytesByPartitionId.length).distinct
       if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 
1) {
-        val partitionSpecs = ShufflePartitionsCoalescer.coalescePartitions(
+        val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
           validMetrics.toArray,
           firstPartitionIndex = 0,
           lastPartitionIndex = distinctNumPreShufflePartitions.head,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 4387409..7f52393 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.adaptive
 
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.io.FileUtils
 
@@ -111,22 +110,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
       targetSize: Long): Seq[Int] = {
     val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
     val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
-    val partitionStartIndices = ArrayBuffer[Int]()
-    partitionStartIndices += 0
-    var i = 0
-    var postMapPartitionSize = 0L
-    while (i < mapPartitionSizes.length) {
-      val nextMapPartitionSize = mapPartitionSizes(i)
-      if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) {
-        partitionStartIndices += i
-        postMapPartitionSize = nextMapPartitionSize
-      } else {
-        postMapPartitionSize += nextMapPartitionSize
-      }
-      i += 1
-    }
-
-    partitionStartIndices
+    ShufflePartitionsUtil.splitSizeListByTargetSize(mapPartitionSizes, 
targetSize)
   }
 
   private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics 
= {
@@ -211,21 +195,25 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
           }
 
           val leftParts = if (isLeftSkew) {
-            leftSkewDesc.addPartitionSize(leftSize)
-            createSkewPartitions(
-              partitionIndex,
-              getMapStartIndices(left, partitionIndex, leftTargetSize),
-              getNumMappers(left))
+            val mapStartIndices = getMapStartIndices(left, partitionIndex, 
leftTargetSize)
+            if (mapStartIndices.length > 1) {
+              leftSkewDesc.addPartitionSize(leftSize)
+              createSkewPartitions(partitionIndex, mapStartIndices, 
getNumMappers(left))
+            } else {
+              Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
+            }
           } else {
             Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
           }
 
           val rightParts = if (isRightSkew) {
-            rightSkewDesc.addPartitionSize(rightSize)
-            createSkewPartitions(
-              partitionIndex,
-              getMapStartIndices(right, partitionIndex, rightTargetSize),
-              getNumMappers(right))
+            val mapStartIndices = getMapStartIndices(right, partitionIndex, 
rightTargetSize)
+            if (mapStartIndices.length > 1) {
+              rightSkewDesc.addPartitionSize(rightSize)
+              createSkewPartitions(partitionIndex, mapStartIndices, 
getNumMappers(right))
+            } else {
+              Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
+            }
           } else {
             Seq(CoalescedPartitionSpec(partitionIndex, partitionIndex + 1))
           }
@@ -273,7 +261,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
     if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
       nonSkewPartitionIndices.map(i => CoalescedPartitionSpec(i, i + 1))
     } else {
-      ShufflePartitionsCoalescer.coalescePartitions(
+      ShufflePartitionsUtil.coalescePartitions(
         Array(leftStats, rightStats),
         firstPartitionIndex = nonSkewPartitionIndices.head,
         // `lastPartitionIndex` is exclusive.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
similarity index 73%
rename from 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
rename to 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 8c58241..32f5fd4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsCoalescer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -23,7 +23,9 @@ import org.apache.spark.MapOutputStatistics
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution.{CoalescedPartitionSpec, 
ShufflePartitionSpec}
 
-object ShufflePartitionsCoalescer extends Logging {
+object ShufflePartitionsUtil extends Logging {
+  final val SMALL_PARTITION_FACTOR = 0.2
+  final val MERGED_PARTITION_FACTOR = 1.2
 
   /**
    * Coalesce the same range of partitions (`firstPartitionIndex` to 
`lastPartitionIndex`, the
@@ -114,4 +116,50 @@ object ShufflePartitionsCoalescer extends Logging {
 
     partitionSpecs
   }
+
+  /**
+   * Given a list of size, return an array of indices to split the list into 
multiple partitions,
+   * so that the size sum of each partition is close to the target size. Each 
index indicates the
+   * start of a partition.
+   */
+  def splitSizeListByTargetSize(sizes: Seq[Long], targetSize: Long): 
Array[Int] = {
+    val partitionStartIndices = ArrayBuffer[Int]()
+    partitionStartIndices += 0
+    var i = 0
+    var currentPartitionSize = 0L
+    var lastPartitionSize = -1L
+
+    def tryMergePartitions() = {
+      // When we are going to start a new partition, it's possible that the 
current partition or
+      // the previous partition is very small and it's better to merge the 
current partition into
+      // the previous partition.
+      val shouldMergePartitions = lastPartitionSize > -1 &&
+        ((currentPartitionSize + lastPartitionSize) < targetSize * 
MERGED_PARTITION_FACTOR ||
+        (currentPartitionSize < targetSize * SMALL_PARTITION_FACTOR ||
+          lastPartitionSize < targetSize * SMALL_PARTITION_FACTOR))
+      if (shouldMergePartitions) {
+        // We decide to merge the current partition into the previous one, so 
the start index of
+        // the current partition should be removed.
+        partitionStartIndices.remove(partitionStartIndices.length - 1)
+        lastPartitionSize += currentPartitionSize
+      } else {
+        lastPartitionSize = currentPartitionSize
+      }
+    }
+
+    while (i < sizes.length) {
+      // If including the next size in the current partition exceeds the 
target size, package the
+      // current partition and start a new partition.
+      if (i > 0 && currentPartitionSize + sizes(i) > targetSize) {
+        tryMergePartitions()
+        partitionStartIndices += i
+        currentPartitionSize = sizes(i)
+      } else {
+        currentPartitionSize += sizes(i)
+      }
+      i += 1
+    }
+    tryMergePartitions()
+    partitionStartIndices.toArray
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
similarity index 88%
rename from 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
rename to 
sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index 8aab299..2cd4c98 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsCoalescerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.{MapOutputStatistics, SparkFunSuite}
-import org.apache.spark.sql.execution.adaptive.ShufflePartitionsCoalescer
+import org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil
 
-class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
+class ShufflePartitionsUtilSuite extends SparkFunSuite {
 
   private def checkEstimation(
       bytesByPartitionIdArray: Array[Array[Long]],
@@ -31,7 +31,7 @@ class ShufflePartitionsCoalescerSuite extends SparkFunSuite {
       case (bytesByPartitionId, index) =>
         new MapOutputStatistics(index, bytesByPartitionId)
     }
-    val estimatedPartitionStartIndices = 
ShufflePartitionsCoalescer.coalescePartitions(
+    val estimatedPartitionStartIndices = 
ShufflePartitionsUtil.coalescePartitions(
       mapOutputStatistics,
       0,
       bytesByPartitionIdArray.head.length,
@@ -252,4 +252,30 @@ class ShufflePartitionsCoalescerSuite extends 
SparkFunSuite {
         targetSize, minNumPartitions)
     }
   }
+
+  test("splitSizeListByTargetSize") {
+    val targetSize = 100
+
+    // merge the small partitions at the beginning/end
+    val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15)
+    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList1, 
targetSize).toSeq ==
+      Seq(0, 2, 5))
+
+    // merge the small partitions in the middle
+    val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30)
+    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList2, 
targetSize).toSeq ==
+      Seq(0, 2, 4, 5))
+
+    // merge small partitions if the partition itself is smaller than
+    // targetSize * SMALL_PARTITION_FACTOR
+    val sizeList3 = Seq[Long](15, 1000, 15, 1000)
+    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList3, 
targetSize).toSeq ==
+      Seq(0, 3))
+
+    // merge small partitions if the combined size is smaller than
+    // targetSize * MERGED_PARTITION_FACTOR
+    val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 25, 35)
+    assert(ShufflePartitionsUtil.splitSizeListByTargetSize(sizeList4, 
targetSize).toSeq ==
+      Seq(0, 2, 3))
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 94947a8..fcca23d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -645,11 +645,11 @@ class AdaptiveQueryExecSuite
         //              into 2 splits and right side is divided into 4 splits, 
so
         //              2 x 4 sub-partitions.
         // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
-        // Partition 4: only left side is skewed, and divide into 3 splits, so
-        //              3 sub-partitions.
+        // Partition 4: only left side is skewed, and divide into 2 splits, so
+        //              2 sub-partitions.
         // So total (8 + 1 + 3) partitions.
         val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 8 + 1 + 3)
+        checkSkewJoin(innerSmj, 8 + 1 + 2)
 
         // skewed left outer join optimization
         val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
@@ -659,11 +659,11 @@ class AdaptiveQueryExecSuite
         // Partition 0: both left and right sides are skewed, but left join 
can't split right side,
         //              so only left side is divided into 2 splits, and thus 2 
sub-partitions.
         // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
-        // Partition 4: only left side is skewed, and divide into 3 splits, so
-        //              3 sub-partitions.
-        // So total (2 + 1 + 3) partitions.
+        // Partition 4: only left side is skewed, and divide into 2 splits, so
+        //              2 sub-partitions.
+        // So total (2 + 1 + 2) partitions.
         val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2 + 1 + 3)
+        checkSkewJoin(leftSmj, 2 + 1 + 2)
 
         // skewed right outer join optimization
         val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to