This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 69f9ee1  [SPARK-31452][SQL] Do not create partition spec for 0-size 
partitions in AQE
69f9ee1 is described below

commit 69f9ee18b66991f8d6989fa20f93cd7343be28a9
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Apr 20 13:50:07 2020 -0700

    [SPARK-31452][SQL] Do not create partition spec for 0-size partitions in AQE
    
    ### What changes were proposed in this pull request?
    
    This PR skips creating the partition specs in `ShufflePartitionsUtil` for 
0-size partitions, which avoids launching unnecessary tasks that do nothing.
    
    ### Why are the changes needed?
    
    launching tasks that do nothing is a waste.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    updated tests
    
    Closes #28226 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  7 ++-
 .../adaptive/CustomShuffleReaderExec.scala         |  5 +-
 .../adaptive/OptimizeLocalShuffleReader.scala      |  4 +-
 .../execution/adaptive/OptimizeSkewedJoin.scala    | 10 ++--
 .../execution/adaptive/ShufflePartitionsUtil.scala | 13 ++++-
 .../sql/execution/ShufflePartitionsUtilSuite.scala | 65 ++++++++++++++-------
 .../adaptive/AdaptiveQueryExecSuite.scala          | 67 ++++++++++++++++------
 7 files changed, 120 insertions(+), 51 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e6a3966..a257668 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -434,9 +434,10 @@ object SQLConf {
 
   val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
     buildConf("spark.sql.adaptive.coalescePartitions.minPartitionNum")
-      .doc("The minimum number of shuffle partitions after coalescing. If not 
set, the default " +
-        "value is the default parallelism of the Spark cluster. This 
configuration only " +
-        s"has an effect when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
+      .doc("The suggested (not guaranteed) minimum number of shuffle 
partitions after " +
+        "coalescing. If not set, the default value is the default parallelism 
of the " +
+        "Spark cluster. This configuration only has an effect when " +
+        s"'${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
         s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true.")
       .version("3.0.0")
       .intConf
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 6450d49..e7f3bf1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -49,7 +49,8 @@ case class CustomShuffleReaderExec private(
     // If it is a local shuffle reader with one mapper per task, then the 
output partitioning is
     // the same as the plan before shuffle.
     // TODO this check is based on assumptions of callers' behavior but is 
sufficient for now.
-    if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
+    if (partitionSpecs.nonEmpty &&
+        partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
         
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size
 ==
           partitionSpecs.length) {
       child match {
@@ -98,7 +99,7 @@ case class CustomShuffleReaderExec private(
   }
 
   @transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
-    if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
+    if (partitionSpecs.nonEmpty && !isLocalReader && 
shuffleStage.get.mapStats.isDefined) {
       val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId
       Some(partitionSpecs.map {
         case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index a5b3cac..5416fde 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -141,8 +141,8 @@ object OptimizeLocalShuffleReader {
   def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
     case s: ShuffleQueryStageExec =>
       s.shuffle.canChangeNumPartitions
-    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
-      s.shuffle.canChangeNumPartitions
+    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
+      s.shuffle.canChangeNumPartitions && partitionSpecs.nonEmpty
     case _ => false
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 58e07fa..396c9c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -88,9 +88,11 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends 
Rule[SparkPlan] {
   private def targetSize(sizes: Seq[Long], medianSize: Long): Long = {
     val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
     val nonSkewSizes = sizes.filterNot(isSkewed(_, medianSize))
-    // It's impossible that all the partitions are skewed, as we use median 
size to define skew.
-    assert(nonSkewSizes.nonEmpty)
-    math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
+    if (nonSkewSizes.isEmpty) {
+      advisorySize
+    } else {
+      math.max(advisorySize, nonSkewSizes.sum / nonSkewSizes.length)
+    }
   }
 
   /**
@@ -297,7 +299,7 @@ private object ShuffleStage {
       Some(ShuffleStageInfo(s, mapStats, partitions))
 
     case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs)
-      if s.mapStats.isDefined =>
+      if s.mapStats.isDefined && partitionSpecs.nonEmpty =>
       val mapStats = s.mapStats.get
       val sizes = mapStats.bytesByPartitionId
       val partitions = partitionSpecs.map {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index e10ed4f..d6e44b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -91,6 +91,14 @@ object ShufflePartitionsUtil extends Logging {
     var latestSplitPoint = 0
     var coalescedSize = 0L
     var i = 0
+
+    def createPartitionSpec(): Unit = {
+      // Skip empty inputs, as it is a waste to launch an empty task.
+      if (coalescedSize > 0) {
+        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
+      }
+    }
+
     while (i < numPartitions) {
       // We calculate the total size of i-th shuffle partitions from all 
shuffles.
       var totalSizeOfCurrentPartition = 0L
@@ -103,7 +111,7 @@ object ShufflePartitionsUtil extends Logging {
       // If including the `totalSizeOfCurrentPartition` would exceed the 
target size, then start a
       // new coalesced partition.
       if (i > latestSplitPoint && coalescedSize + totalSizeOfCurrentPartition 
> targetSize) {
-        partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, i)
+        createPartitionSpec()
         latestSplitPoint = i
         // reset postShuffleInputSize.
         coalescedSize = totalSizeOfCurrentPartition
@@ -112,8 +120,7 @@ object ShufflePartitionsUtil extends Logging {
       }
       i += 1
     }
-    partitionSpecs += CoalescedPartitionSpec(latestSplitPoint, numPartitions)
-
+    createPartitionSpec()
     partitionSpecs
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index 7acc33c..f5c3b78 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -42,13 +42,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
     val targetSize = 100
 
     {
-      // All bytes per partition are 0.
-      val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
-      checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
-    }
-
-    {
       // Some bytes per partition are 0 and total size is less than the target 
size.
       // 1 coalesced partition is expected.
       val bytesByPartitionId = Array[Long](10, 0, 20, 0, 0)
@@ -70,8 +63,7 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
         CoalescedPartitionSpec(0, 1),
         CoalescedPartitionSpec(1, 2),
         CoalescedPartitionSpec(2, 3),
-        CoalescedPartitionSpec(3, 4),
-        CoalescedPartitionSpec(4, 5))
+        CoalescedPartitionSpec(3, 4))
       checkEstimation(Array(bytesByPartitionId), expectedPartitionSpecs, 
targetSize)
     }
 
@@ -109,17 +101,6 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
     }
 
     {
-      // All bytes per partition are 0.
-      val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
-      val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
-      checkEstimation(
-        Array(bytesByPartitionId1, bytesByPartitionId2),
-        expectedPartitionSpecs,
-        targetSize)
-    }
-
-    {
       // Some bytes per partition are 0.
       // 1 coalesced partition is expected.
       val bytesByPartitionId1 = Array[Long](0, 10, 0, 20, 0)
@@ -217,7 +198,18 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
       // the size of data is 0.
       val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
       val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
-      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 5))
+      checkEstimation(
+        Array(bytesByPartitionId1, bytesByPartitionId2),
+        Seq.empty, targetSize, minNumPartitions)
+    }
+
+
+    {
+      // The minimal number of coalesced partitions is not enforced because
+      // there are too many 0-size partitions.
+      val bytesByPartitionId1 = Array[Long](200, 0, 0)
+      val bytesByPartitionId2 = Array[Long](100, 0, 0)
+      val expectedPartitionSpecs = Seq(CoalescedPartitionSpec(0, 1))
       checkEstimation(
         Array(bytesByPartitionId1, bytesByPartitionId2),
         expectedPartitionSpecs,
@@ -251,6 +243,37 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite {
     }
   }
 
+  test("do not create partition spec for 0-size partitions") {
+    val targetSize = 100
+    val minNumPartitions = 2
+
+    {
+      // 1 shuffle: All bytes per partition are 0, no partition spec created.
+      val bytesByPartitionId = Array[Long](0, 0, 0, 0, 0)
+      checkEstimation(Array(bytesByPartitionId), Seq.empty, targetSize)
+    }
+
+    {
+      // 2 shuffles: All bytes per partition are 0, no partition spec created.
+      val bytesByPartitionId1 = Array[Long](0, 0, 0, 0, 0)
+      val bytesByPartitionId2 = Array[Long](0, 0, 0, 0, 0)
+      checkEstimation(Array(bytesByPartitionId1, bytesByPartitionId2), 
Seq.empty, targetSize)
+    }
+
+    {
+      // No partition spec created for the 0-size partitions.
+      val bytesByPartitionId1 = Array[Long](200, 0, 0, 0, 0)
+      val bytesByPartitionId2 = Array[Long](100, 0, 300, 0, 0)
+      val expectedPartitionSpecs = Seq(
+        CoalescedPartitionSpec(0, 1),
+        CoalescedPartitionSpec(2, 3))
+      checkEstimation(
+        Array(bytesByPartitionId1, bytesByPartitionId2),
+        expectedPartitionSpecs,
+        targetSize, minNumPartitions)
+    }
+  }
+
   test("splitSizeListByTargetSize") {
     val targetSize = 100
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 694be98..e82ccda 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -157,17 +157,17 @@ class AdaptiveQueryExecSuite
       val localShuffleRDD0 = 
localReaders(0).execute().asInstanceOf[ShuffledRowRDD]
       val localShuffleRDD1 = 
localReaders(1).execute().asInstanceOf[ShuffledRowRDD]
       // The pre-shuffle partition size is [0, 0, 0, 72, 0]
-      // And the partitionStartIndices is [0, 3, 4], so advisoryParallelism = 
3.
+      // We exclude the 0-size partitions, so only one partition, 
advisoryParallelism = 1
       // the final parallelism is
-      // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
+      // math.max(1, advisoryParallelism / numMappers): math.max(1, 1/2) = 1
       // and the partitions length is 1 * numMappers = 2
       assert(localShuffleRDD0.getPartitions.length == 2)
       // The pre-shuffle partition size is [0, 72, 0, 72, 126]
-      // And the partitionStartIndices is [0, 1, 2, 3, 4], so 
advisoryParallelism = 5.
+      // We exclude the 0-size partitions, so only 3 partition, 
advisoryParallelism = 3
       // the final parallelism is
-      // math.max(1, advisoryParallelism / numMappers): math.max(1, 5/2) = 2
-      // and the partitions length is 2 * numMappers = 4
-      assert(localShuffleRDD1.getPartitions.length == 4)
+      // math.max(1, advisoryParallelism / numMappers): math.max(1, 3/2) = 1
+      // and the partitions length is 1 * numMappers = 2
+      assert(localShuffleRDD1.getPartitions.length == 2)
     }
   }
 
@@ -197,6 +197,38 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("Empty stage coalesced to 0-partition RDD") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") {
+      val df1 = spark.range(10).withColumn("a", 'id)
+      val df2 = spark.range(10).withColumn("b", 'id)
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+        val testDf = df1.where('a > 10).join(df2.where('b > 10), 
"id").groupBy('a).count()
+        checkAnswer(testDf, Seq())
+        val plan = testDf.queryExecution.executedPlan
+        assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined)
+        val coalescedReaders = collect(plan) {
+          case r: CustomShuffleReaderExec => r
+        }
+        assert(coalescedReaders.length == 2)
+        coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
+      }
+
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
+        val testDf = df1.where('a > 10).join(df2.where('b > 10), 
"id").groupBy('a).count()
+        checkAnswer(testDf, Seq())
+        val plan = testDf.queryExecution.executedPlan
+        assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined)
+        val coalescedReaders = collect(plan) {
+          case r: CustomShuffleReaderExec => r
+        }
+        assert(coalescedReaders.length == 2, s"$plan")
+        coalescedReaders.foreach(r => assert(r.partitionSpecs.isEmpty))
+      }
+    }
+  }
+
   test("Scalar subquery") {
     withSQLConf(
         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
@@ -647,12 +679,13 @@ class AdaptiveQueryExecSuite
         // Partition 0: both left and right sides are skewed, left side is 
divided
         //              into 2 splits and right side is divided into 4 splits, 
so
         //              2 x 4 sub-partitions.
-        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
+        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but 
it's ignored as the
+        //                    size is 0.
         // Partition 4: only left side is skewed, and divide into 2 splits, so
         //              2 sub-partitions.
-        // So total (8 + 1 + 3) partitions.
+        // So total (8 + 0 + 2) partitions.
         val innerSmj = findTopLevelSortMergeJoin(innerAdaptivePlan)
-        checkSkewJoin(innerSmj, 8 + 1 + 2)
+        checkSkewJoin(innerSmj, 8 + 0 + 2)
 
         // skewed left outer join optimization
         val (_, leftAdaptivePlan) = runAdaptiveAndVerifyResult(
@@ -661,12 +694,13 @@ class AdaptiveQueryExecSuite
         // right stats:[6292, 0, 0, 0, 0]
         // Partition 0: both left and right sides are skewed, but left join 
can't split right side,
         //              so only left side is divided into 2 splits, and thus 2 
sub-partitions.
-        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
+        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but 
it's ignored as the
+        //                    size is 0.
         // Partition 4: only left side is skewed, and divide into 2 splits, so
         //              2 sub-partitions.
-        // So total (2 + 1 + 2) partitions.
+        // So total (2 + 0 + 2) partitions.
         val leftSmj = findTopLevelSortMergeJoin(leftAdaptivePlan)
-        checkSkewJoin(leftSmj, 2 + 1 + 2)
+        checkSkewJoin(leftSmj, 2 + 0 + 2)
 
         // skewed right outer join optimization
         val (_, rightAdaptivePlan) = runAdaptiveAndVerifyResult(
@@ -675,12 +709,13 @@ class AdaptiveQueryExecSuite
         // right stats:[6292, 0, 0, 0, 0]
         // Partition 0: both left and right sides are skewed, but right join 
can't split left side,
         //              so only right side is divided into 4 splits, and thus 
4 sub-partitions.
-        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition.
+        // Partition 1, 2, 3: not skewed, and coalesced into 1 partition, but 
it's ignored as the
+        //                    size is 0.
         // Partition 4: only left side is skewed, but right join can't split 
left side, so just
         //              1 partition.
-        // So total (4 + 1 + 1) partitions.
+        // So total (4 + 0 + 1) partitions.
         val rightSmj = findTopLevelSortMergeJoin(rightAdaptivePlan)
-        checkSkewJoin(rightSmj, 4 + 1 + 1)
+        checkSkewJoin(rightSmj, 4 + 0 + 1)
       }
     }
   }
@@ -852,7 +887,7 @@ class AdaptiveQueryExecSuite
           }.head
           assert(!reader.isLocalReader)
           assert(reader.hasSkewedPartition)
-          assert(reader.hasCoalescedPartition)
+          assert(!reader.hasCoalescedPartition) // 0-size partitions are 
ignored.
           assert(reader.metrics.contains("numSkewedPartitions"))
           assert(reader.metrics("numSkewedPartitions").value > 0)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to