This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 7879634  [SPARK-35639][SQL] Make hasCoalescedPartition return true if 
something was actually coalesced
7879634 is described below

commit 78796349d9246d36a248e645ea303270cace14ee
Author: Eugene Koifman <eugene.koif...@workday.com>
AuthorDate: Wed Jul 14 15:48:02 2021 +0800

    [SPARK-35639][SQL] Make hasCoalescedPartition return true if something was 
actually coalesced
    
    ### What changes were proposed in this pull request?
    Fix `CustomShuffleReaderExec.hasCoalescedPartition` so that it returns true 
only if some original partitions got combined
    
    ### Why are the changes needed?
    W/o this change `CustomShuffleReaderExec` description can report 
`coalesced` even though partitions are unchanged
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the `Arguments` in the node description is now accurate:
    ```
    (16) CustomShuffleReader
    Input [3]: [registration#4, sum#85, count#86L]
    Arguments: coalesced
    ```
    
    ### How was this patch tested?
    Existing tests
    
    Closes #32872 from ekoifman/PRISM-77023-fix-hasCoalescedPartition.
    
    Authored-by: Eugene Koifman <eugene.koif...@workday.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 4033b2a3f4faa48bfdc0802d5637b7ec726b06e7)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../adaptive/CustomShuffleReaderExec.scala          | 11 +++++++++--
 .../execution/CoalesceShufflePartitionsSuite.scala  | 21 +++++++++++++++++----
 .../execution/adaptive/AdaptiveQueryExecSuite.scala |  8 ++++----
 3 files changed, 30 insertions(+), 10 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 176fff6..61318a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -87,8 +87,15 @@ case class CustomShuffleReaderExec private(
     Iterator(desc)
   }
 
-  def hasCoalescedPartition: Boolean =
-    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  /**
+   * Returns true iff some non-empty partitions were combined
+   */
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists {
+      case s: CoalescedPartitionSpec => s.endReducerIndex - 
s.startReducerIndex > 1
+      case _ => false
+    }
+  }
 
   def hasSkewedPartition: Boolean =
     partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
index e368868..fbae929 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala
@@ -315,7 +315,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
   test("SPARK-24705 adaptive query execution works correctly when exchange 
reuse enabled") {
     val test: SparkSession => Unit = { spark: SparkSession =>
       spark.sql("SET spark.sql.exchange.reuse=true")
-      val df = spark.range(1).selectExpr("id AS key", "id AS value")
+      val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value")
 
       // test case 1: a query stage has 3 child stages but they are the same 
stage.
       // Final Stage 1
@@ -323,7 +323,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
       //   ReusedQueryStage 0
       //   ReusedQueryStage 0
       val resultDf = df.join(df, "key").join(df, "key")
-      QueryTest.checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil)
+      QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i)))
       val finalPlan = resultDf.queryExecution.executedPlan
         .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
       assert(finalPlan.collect {
@@ -344,7 +344,9 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
       val grouped = df.groupBy("key").agg(max("value").as("value"))
       val resultDf2 = grouped.groupBy(col("key") + 1).max("value")
         .union(grouped.groupBy(col("key") + 2).max("value"))
-      QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil)
+      QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Row(2, 1) :: 
Row(3, 1) ::
+        Row(3, 2) :: Row(4, 2) :: Row(4, 3) :: Row(5, 3) :: Row(5, 4) :: 
Row(6, 4) :: Row(6, 5) ::
+        Row(7, 5) :: Nil)
 
       val finalPlan2 = resultDf2.queryExecution.executedPlan
         .asInstanceOf[AdaptiveSparkPlanExec].executedPlan
@@ -353,6 +355,17 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
       val level1Stages = finalPlan2.collect { case q: QueryStageExec => q }
       assert(level1Stages.length == 2)
 
+      assert(
+        finalPlan2.collect {
+          case r @ CoalescedShuffleReader() => r
+        }.length == 2, "finalPlan2")
+
+      level1Stages.foreach(qs =>
+        assert(qs.plan.collect {
+          case r @ CoalescedShuffleReader() => r
+        }.length == 1,
+          "Wrong CoalescedShuffleReader below " + qs.simpleString(3)))
+
       val leafStages = level1Stages.flatMap { stage =>
         // All of the child stages of result stage have only one child stage.
         val children = stage.plan.collect { case q: QueryStageExec => q }
@@ -368,7 +381,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite 
with BeforeAndAfterAl
       }
       assert(reusedStages.length == 1)
     }
-    withSparkSession(test, 4, None)
+    withSparkSession(test, 400, None)
   }
 
   test("Do not reduce the number of shuffle partition for repartition") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 000cdfb..5000abc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -953,7 +953,7 @@ class AdaptiveQueryExecSuite
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
         SQLConf.SHUFFLE_PARTITIONS.key -> "100",
         SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
-        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
+        SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "1000") {
         withTempView("skewData1", "skewData2") {
           spark
             .range(0, 1000, 1, 10)
@@ -982,9 +982,9 @@ class AdaptiveQueryExecSuite
             assert(reader.metrics.contains("numSkewedPartitions"))
           }
           assert(readers(0).metrics("numSkewedPartitions").value == 2)
-          assert(readers(0).metrics("numSkewedSplits").value == 15)
+          assert(readers(0).metrics("numSkewedSplits").value == 11)
           assert(readers(1).metrics("numSkewedPartitions").value == 1)
-          assert(readers(1).metrics("numSkewedSplits").value == 12)
+          assert(readers(1).metrics("numSkewedSplits").value == 9)
         }
       }
     }
@@ -1582,7 +1582,7 @@ class AdaptiveQueryExecSuite
         // Skew join can apply as the repartition is not optimized out.
         assert(smjWithNum.head.isSkewJoin)
         val customReadersWithNum = collect(smjWithNum.head) {
-          case c: CustomShuffleReaderExec if c.hasCoalescedPartition => c
+          case c: CustomShuffleReaderExec => c
         }
         assert(customReadersWithNum.nonEmpty)
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to