This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4033b2a [SPARK-35639][SQL] Make hasCoalescedPartition return true if something was actually coalesced 4033b2a is described below commit 4033b2a3f4faa48bfdc0802d5637b7ec726b06e7 Author: Eugene Koifman <eugene.koif...@workday.com> AuthorDate: Wed Jul 14 15:48:02 2021 +0800 [SPARK-35639][SQL] Make hasCoalescedPartition return true if something was actually coalesced ### What changes were proposed in this pull request? Fix `CustomShuffleReaderExec.hasCoalescedPartition` so that it returns true only if some original partitions got combined ### Why are the changes needed? W/o this change `CustomShuffleReaderExec` description can report `coalesced` even though partitions are unchanged ### Does this PR introduce _any_ user-facing change? Yes, the `Arguments` in the node description is now accurate: ``` (16) CustomShuffleReader Input [3]: [registration#4, sum#85, count#86L] Arguments: coalesced ``` ### How was this patch tested? Existing tests Closes #32872 from ekoifman/PRISM-77023-fix-hasCoalescedPartition. Authored-by: Eugene Koifman <eugene.koif...@workday.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../adaptive/CustomShuffleReaderExec.scala | 11 +++++++++-- .../execution/CoalesceShufflePartitionsSuite.scala | 21 +++++++++++++++++---- .../execution/adaptive/AdaptiveQueryExecSuite.scala | 8 ++++---- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 176fff6..61318a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -87,8 +87,15 @@ case class CustomShuffleReaderExec private( Iterator(desc) } - def hasCoalescedPartition: Boolean = - partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec]) + /** + * Returns true iff some non-empty partitions were combined + */ + def hasCoalescedPartition: Boolean = { + partitionSpecs.exists { + case s: CoalescedPartitionSpec => s.endReducerIndex - s.startReducerIndex > 1 + case _ => false + } + } def hasSkewedPartition: Boolean = partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala index e368868..fbae929 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CoalesceShufflePartitionsSuite.scala @@ -315,7 +315,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { val test: SparkSession => Unit = { spark: SparkSession => spark.sql("SET spark.sql.exchange.reuse=true") - val df = spark.range(1).selectExpr("id AS key", "id AS value") + val df = spark.range(0, 6, 1).selectExpr("id AS key", "id AS value") // test case 1: a query stage has 3 child stages but they are the same stage. // Final Stage 1 @@ -323,7 +323,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl // ReusedQueryStage 0 // ReusedQueryStage 0 val resultDf = df.join(df, "key").join(df, "key") - QueryTest.checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) + QueryTest.checkAnswer(resultDf, (0 to 5).map(i => Row(i, i, i, i))) val finalPlan = resultDf.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan assert(finalPlan.collect { @@ -344,7 +344,9 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val grouped = df.groupBy("key").agg(max("value").as("value")) val resultDf2 = grouped.groupBy(col("key") + 1).max("value") .union(grouped.groupBy(col("key") + 2).max("value")) - QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Nil) + QueryTest.checkAnswer(resultDf2, Row(1, 0) :: Row(2, 0) :: Row(2, 1) :: Row(3, 1) :: + Row(3, 2) :: Row(4, 2) :: Row(4, 3) :: Row(5, 3) :: Row(5, 4) :: Row(6, 4) :: Row(6, 5) :: + Row(7, 5) :: Nil) val finalPlan2 = resultDf2.queryExecution.executedPlan .asInstanceOf[AdaptiveSparkPlanExec].executedPlan @@ -353,6 +355,17 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl val level1Stages = finalPlan2.collect { case q: QueryStageExec => q } assert(level1Stages.length == 2) + assert( + finalPlan2.collect { + case r @ CoalescedShuffleReader() => r + }.length == 2, "finalPlan2") + + level1Stages.foreach(qs => + assert(qs.plan.collect { + case r @ CoalescedShuffleReader() => r + }.length == 1, + "Wrong CoalescedShuffleReader below " + qs.simpleString(3))) + val leafStages = level1Stages.flatMap { stage => // All of the child stages of result stage have only one child stage. val children = stage.plan.collect { case q: QueryStageExec => q } @@ -368,7 +381,7 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl } assert(reusedStages.length == 1) } - withSparkSession(test, 4, None) + withSparkSession(test, 400, None) } test("Do not reduce the number of shuffle partition for repartition") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 000cdfb..5000abc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -953,7 +953,7 @@ class AdaptiveQueryExecSuite SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.SHUFFLE_PARTITIONS.key -> "100", SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800", - SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") { + SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "1000") { withTempView("skewData1", "skewData2") { spark .range(0, 1000, 1, 10) @@ -982,9 +982,9 @@ class AdaptiveQueryExecSuite assert(reader.metrics.contains("numSkewedPartitions")) } assert(readers(0).metrics("numSkewedPartitions").value == 2) - assert(readers(0).metrics("numSkewedSplits").value == 15) + assert(readers(0).metrics("numSkewedSplits").value == 11) assert(readers(1).metrics("numSkewedPartitions").value == 1) - assert(readers(1).metrics("numSkewedSplits").value == 12) + assert(readers(1).metrics("numSkewedSplits").value == 9) } } } @@ -1582,7 +1582,7 @@ class AdaptiveQueryExecSuite // Skew join can apply as the repartition is not optimized out. assert(smjWithNum.head.isSkewJoin) val customReadersWithNum = collect(smjWithNum.head) { - case c: CustomShuffleReaderExec if c.hasCoalescedPartition => c + case c: CustomShuffleReaderExec => c } assert(customReadersWithNum.nonEmpty) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org