This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b968cd3  [SPARK-30918][SQL] improve the splitting of skewed partitions
b968cd3 is described below

commit b968cd37796a5730fe5c2318d23a38416f550957
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Feb 25 14:10:29 2020 -0800

    [SPARK-30918][SQL] improve the splitting of skewed partitions
    
    ### What changes were proposed in this pull request?
    
    Use the average size of the non-skewed partitions as the target size when 
splitting skewed partitions, instead of 
ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD
    
    ### Why are the changes needed?
    
    The goal of skew join optimization is to make the data distribution move 
even. So it makes more sense the use the average size of the non-skewed 
partitions as the target size.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #27669 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Xiao Li <gatorsm...@gmail.com>
    (cherry picked from commit 8f247e5d3682ad765bdbb9ea5a4315862c5a383c)
    Signed-off-by: Xiao Li <gatorsm...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 +---
 .../execution/adaptive/OptimizeSkewedJoin.scala    | 62 ++++++++++++++++++----
 .../adaptive/AdaptiveQueryExecSuite.scala          |  4 +-
 3 files changed, 54 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 674c6df..e6f7cfd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -432,19 +432,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
-  val ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD =
-    
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionSizeThreshold")
-      .doc("Configures the minimum size in bytes for a partition that is 
considered as a skewed " +
-        "partition in adaptive skewed join.")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("64MB")
-
   val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
     
buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor")
       .doc("A partition is considered as a skewed partition if its size is 
larger than" +
         " this factor multiple the median partition size and also larger than 
" +
-        s" ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}")
+        s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}")
       .intConf
+      .checkValue(_ > 0, "The skew factor must be positive.")
       .createWithDefault(10)
 
   val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 578d2d7..d3cb864 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -34,6 +34,30 @@ import 
org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExcha
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.internal.SQLConf
 
+/**
+ * A rule to optimize skewed joins to avoid straggler tasks whose share of 
data are significantly
+ * larger than those of the rest of the tasks.
+ *
+ * The general idea is to divide each skew partition into smaller partitions 
and replicate its
+ * matching partition on the other side of the join so that they can run in 
parallel tasks.
+ * Note that when matching partitions from the left side and the right side 
both have skew,
+ * it will become a cartesian product of splits from left and right joining 
together.
+ *
+ * For example, assume the Sort-Merge join has 4 partitions:
+ * left:  [L1, L2, L3, L4]
+ * right: [R1, R2, R3, R4]
+ *
+ * Let's say L2, L4 and R3, R4 are skewed, and each of them get split into 2 
sub-partitions. This
+ * is scheduled to run 4 tasks at the beginning: (L1, R1), (L2, R2), (L2, R2), 
(L2, R2).
+ * This rule expands it to 9 tasks to increase parallelism:
+ * (L1, R1),
+ * (L2-1, R2), (L2-2, R2),
+ * (L3, R3-1), (L3, R3-2),
+ * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2)
+ *
+ * Note that, when this rule is enabled, it also coalesces non-skewed 
partitions like
+ * `ReduceNumShufflePartitions` does.
+ */
 case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
 
   private val ensureRequirements = EnsureRequirements(conf)
@@ -43,12 +67,12 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
 
   /**
    * A partition is considered as a skewed partition if its size is larger 
than the median
-   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger 
than
-   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also 
larger than
+   * SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.
    */
   private def isSkewed(size: Long, medianSize: Long): Boolean = {
     size > medianSize * 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
-      size > 
conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+      size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
   }
 
   private def medianSize(stats: MapOutputStatistics): Long = {
@@ -62,6 +86,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
   }
 
   /**
+   * The goal of skew join optimization is to make the data distribution more 
even. The target size
+   * to split skewed partitions is the average size of non-skewed partition, 
or the
+   * target post-shuffle partition size if avg size is smaller than it.
+   */
+  private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = 
{
+    val targetPostShuffleSize = 
conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
+    val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, 
medianSize))
+    // It's impossible that all the partitions are skewed, as we use median 
size to define skew.
+    assert(nonSkewSizes.nonEmpty)
+    math.max(targetPostShuffleSize, nonSkewSizes.sum / nonSkewSizes.length)
+  }
+
+  /**
    * Get the map size of the specific reduce shuffle Id.
    */
   private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): 
Array[Long] = {
@@ -72,19 +109,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
   /**
    * Split the skewed partition based on the map size and the max split number.
    */
-  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: 
Int): Array[Int] = {
+  private def getMapStartIndices(
+      stage: ShuffleQueryStageExec,
+      partitionId: Int,
+      targetSize: Long): Array[Int] = {
     val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
     val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
-    val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length
-    val advisoryPartitionSize = math.max(avgPartitionSize,
-      conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
     val partitionStartIndices = ArrayBuffer[Int]()
     partitionStartIndices += 0
     var i = 0
     var postMapPartitionSize = 0L
     while (i < mapPartitionSizes.length) {
       val nextMapPartitionSize = mapPartitionSizes(i)
-      if (i > 0 && postMapPartitionSize + nextMapPartitionSize > 
advisoryPartitionSize) {
+      if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) {
         partitionStartIndices += i
         postMapPartitionSize = nextMapPartitionSize
       } else {
@@ -152,6 +189,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
         """.stripMargin)
       val canSplitLeft = canSplitLeftSide(joinType)
       val canSplitRight = canSplitRightSide(joinType)
+      val leftTargetSize = targetSize(leftStats, leftMedSize)
+      val rightTargetSize = targetSize(rightStats, rightMedSize)
 
       val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
       val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
@@ -179,7 +218,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
             leftSkewDesc.addPartitionSize(leftSize)
             createSkewPartitions(
               partitionIndex,
-              getMapStartIndices(left, partitionIndex),
+              getMapStartIndices(left, partitionIndex, leftTargetSize),
               getNumMappers(left))
           } else {
             Seq(SinglePartitionSpec(partitionIndex))
@@ -189,7 +228,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
             rightSkewDesc.addPartitionSize(rightSize)
             createSkewPartitions(
               partitionIndex,
-              getMapStartIndices(right, partitionIndex),
+              getMapStartIndices(right, partitionIndex, rightTargetSize),
               getNumMappers(right))
           } else {
             Seq(SinglePartitionSpec(partitionIndex))
@@ -236,7 +275,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
       rightStats: MapOutputStatistics,
       nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = {
     assert(nonSkewPartitionIndices.nonEmpty)
-    if (nonSkewPartitionIndices.length == 1) {
+    val shouldCoalesce = 
conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+    if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
       Seq(SinglePartitionSpec(nonSkewPartitionIndices.head))
     } else {
       val startIndices = ShufflePartitionsCoalescer.coalescePartitions(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 379e17f..64566af 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -583,7 +583,6 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
       SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
       withTempView("skewData1", "skewData2") {
         spark
@@ -609,8 +608,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000",
-      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
+      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
       withTempView("skewData1", "skewData2") {
         spark
           .range(0, 1000, 1, 10)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to