This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new adac454  [SPARK-34682][SQL] Fix regression in canonicalization error 
check in CustomShuffleReaderExec
adac454 is described below

commit adac45400de87ceda91429c4ac857ab02b54e19d
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Wed Mar 10 20:48:00 2021 +0900

    [SPARK-34682][SQL] Fix regression in canonicalization error check in 
CustomShuffleReaderExec
    
    ### What changes were proposed in this pull request?
    There is a regression in 3.1.1 compared to 3.0.2 when checking for a 
canonicalized plan when executing CustomShuffleReaderExec.
    
    The regression was caused by the call to `sendDriverMetrics` which happens 
before the check and will always fail if the plan is canonicalized.
    
    ### Why are the changes needed?
    This is a regression in a useful error check.
    
    ### Does this PR introduce _any_ user-facing change?
    No. This is not an error that a user would typically see, as far as I know.
    
    ### How was this patch tested?
    I tested this change locally by making a distribution from this PR branch. 
Before fixing the regression I saw:
    
    ```
    java.util.NoSuchElementException: key not found: numPartitions
    ```
    
    After fixing this regression I saw:
    
    ```
    java.lang.IllegalStateException: operating on canonicalized plan
    ```
    
    Closes #31793 from andygrove/SPARK-34682.
    
    Lead-authored-by: Andy Grove <andygrov...@gmail.com>
    Co-authored-by: Andy Grove <andygr...@nvidia.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit fd4843803c4670c656a94c1af652fb4b945bc82c)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../adaptive/CustomShuffleReaderExec.scala          | 12 ++++++------
 .../execution/adaptive/AdaptiveQueryExecSuite.scala | 21 +++++++++++++++++++++
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 49a4c25..2319c9e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -179,12 +179,12 @@ case class CustomShuffleReaderExec private(
   }
 
   private lazy val shuffleRDD: RDD[_] = {
-    sendDriverMetrics()
-
-    shuffleStage.map { stage =>
-      stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
-    }.getOrElse {
-      throw new IllegalStateException("operating on canonicalized plan")
+    shuffleStage match {
+      case Some(stage) =>
+        sendDriverMetrics()
+        stage.shuffle.getShuffleRDD(partitionSpecs.toArray)
+      case _ =>
+        throw new IllegalStateException("operating on canonicalized plan")
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 92f7f40..cdd1901 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.adaptive
 
 import java.io.File
+import java.lang.reflect.InvocationTargetException
 import java.net.URI
 
 import org.apache.log4j.Level
@@ -869,6 +870,26 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-34682: CustomShuffleReaderExec operating on canonicalized plan") 
{
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+        "SELECT key FROM testData GROUP BY key")
+      val readers = collect(adaptivePlan) {
+        case r: CustomShuffleReaderExec => r
+      }
+      assert(readers.length == 1)
+      val reader = readers.head
+      val c = reader.canonicalized.asInstanceOf[CustomShuffleReaderExec]
+      // we can't just call execute() because that has separate checks for 
canonicalized plans
+      val doExecute = c.getClass.getMethod("doExecute")
+      doExecute.setAccessible(true)
+      val ex = intercept[InvocationTargetException] {
+        doExecute.invoke(c)
+      }
+      assert(ex.getCause.getMessage === "operating on canonicalized plan")
+    }
+  }
+
   test("metrics of the shuffle reader") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val (_, adaptivePlan) = runAdaptiveAndVerifyResult(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to