This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new adac454 [SPARK-34682][SQL] Fix regression in canonicalization error check in CustomShuffleReaderExec adac454 is described below commit adac45400de87ceda91429c4ac857ab02b54e19d Author: Andy Grove <andygrov...@gmail.com> AuthorDate: Wed Mar 10 20:48:00 2021 +0900 [SPARK-34682][SQL] Fix regression in canonicalization error check in CustomShuffleReaderExec ### What changes were proposed in this pull request? There is a regression in 3.1.1 compared to 3.0.2 when checking for a canonicalized plan when executing CustomShuffleReaderExec. The regression was caused by the call to `sendDriverMetrics` which happens before the check and will always fail if the plan is canonicalized. ### Why are the changes needed? This is a regression in a useful error check. ### Does this PR introduce _any_ user-facing change? No. This is not an error that a user would typically see, as far as I know. ### How was this patch tested? I tested this change locally by making a distribution from this PR branch. Before fixing the regression I saw: ``` java.util.NoSuchElementException: key not found: numPartitions ``` After fixing this regression I saw: ``` java.lang.IllegalStateException: operating on canonicalized plan ``` Closes #31793 from andygrove/SPARK-34682. Lead-authored-by: Andy Grove <andygrov...@gmail.com> Co-authored-by: Andy Grove <andygr...@nvidia.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit fd4843803c4670c656a94c1af652fb4b945bc82c) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../adaptive/CustomShuffleReaderExec.scala | 12 ++++++------ .../execution/adaptive/AdaptiveQueryExecSuite.scala | 21 +++++++++++++++++++++ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 49a4c25..2319c9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -179,12 +179,12 @@ case class CustomShuffleReaderExec private( } private lazy val shuffleRDD: RDD[_] = { - sendDriverMetrics() - - shuffleStage.map { stage => - stage.shuffle.getShuffleRDD(partitionSpecs.toArray) - }.getOrElse { - throw new IllegalStateException("operating on canonicalized plan") + shuffleStage match { + case Some(stage) => + sendDriverMetrics() + stage.shuffle.getShuffleRDD(partitionSpecs.toArray) + case _ => + throw new IllegalStateException("operating on canonicalized plan") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 92f7f40..cdd1901 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.adaptive import java.io.File +import java.lang.reflect.InvocationTargetException import java.net.URI import org.apache.log4j.Level @@ -869,6 +870,26 @@ class AdaptiveQueryExecSuite } } + test("SPARK-34682: CustomShuffleReaderExec operating on canonicalized plan") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val (_, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT key FROM testData GROUP BY key") + val readers = collect(adaptivePlan) { + case r: CustomShuffleReaderExec => r + } + assert(readers.length == 1) + val reader = readers.head + val c = reader.canonicalized.asInstanceOf[CustomShuffleReaderExec] + // we can't just call execute() because that has separate checks for canonicalized plans + val doExecute = c.getClass.getMethod("doExecute") + doExecute.setAccessible(true) + val ex = intercept[InvocationTargetException] { + doExecute.invoke(c) + } + assert(ex.getCause.getMessage === "operating on canonicalized plan") + } + } + test("metrics of the shuffle reader") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { val (_, adaptivePlan) = runAdaptiveAndVerifyResult( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org