This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 339e4dd  [SPARK-31144][SQL] Wrap Error with QueryExecutionException to 
notify QueryExecutionListener
339e4dd is described below

commit 339e4dd3a3daf6c11670e5ca7786c54f68a86bfa
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Fri Mar 13 15:55:29 2020 -0700

    [SPARK-31144][SQL] Wrap Error with QueryExecutionException to notify 
QueryExecutionListener
    
    ### What changes were proposed in this pull request?
    
    This PR manually reverts changes in #25292 and then wraps java.lang.Error 
with `QueryExecutionException` to notify `QueryExecutionListener` to send it to 
`QueryExecutionListener.onFailure` which only accepts `Exception`.
    
    The bug fix PR for 2.4 is #27904. It needs a separate PR because the 
touched codes were changed a lot.
    
    ### Why are the changes needed?
    
    Avoid API changes and fix a bug.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. Reverting an API change happening in 3.0. QueryExecutionListener APIs 
will be the same as 2.4.
    
    ### How was this patch tested?
    
    The new added test.
    
    Closes #27907 from zsxwing/SPARK-31144.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 1ddf44dfcaff53e870a3c9608e31a60805e50c29)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 project/MimaExcludes.scala                         |  4 --
 .../spark/sql/util/QueryExecutionListener.scala    | 18 +++++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  |  2 +-
 .../org/apache/spark/sql/SessionStateSuite.scala   |  2 +-
 .../spark/sql/TestQueryExecutionListener.scala     |  2 +-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |  2 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala |  2 +-
 .../connector/FileDataSourceV2FallBackSuite.scala  |  6 +--
 .../connector/SupportsCatalogOptionsSuite.scala    |  2 +-
 .../sql/test/DataFrameReaderWriterSuite.scala      |  2 +-
 .../spark/sql/util/DataFrameCallbackSuite.scala    | 62 ++++++++++++++++------
 .../sql/util/ExecutionListenerManagerSuite.scala   |  2 +-
 .../sql/hive/thriftserver/DummyListeners.scala     |  2 +-
 13 files changed, 71 insertions(+), 37 deletions(-)

diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7f66577..f8ad60b 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -419,10 +419,6 @@ object MimaExcludes {
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$"),
 
-    // [SPARK-28556][SQL] QueryExecutionListener should also notify Error
-    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
-    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
-
     // [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"),
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 01f8182..0b5951e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException}
 import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
 import org.apache.spark.sql.internal.StaticSQLConf._
 import org.apache.spark.util.{ListenerBus, Utils}
@@ -55,12 +55,13 @@ trait QueryExecutionListener {
    * @param funcName the name of the action that triggered this query.
    * @param qe the QueryExecution object that carries detail information like 
logical plan,
    *           physical plan, etc.
-   * @param error the error that failed this query.
-   *
+   * @param exception the exception that failed this query. If 
`java.lang.Error` is thrown during
+   *                  execution, it will be wrapped with an `Exception` and it 
can be accessed by
+   *                  `exception.getCause`.
    * @note This can be invoked by multiple different threads.
    */
   @DeveloperApi
-  def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit
+  def onFailure(funcName: String, qe: QueryExecution, exception: Exception): 
Unit
 }
 
 
@@ -140,7 +141,14 @@ private[sql] class ExecutionListenerBus(session: 
SparkSession)
       val funcName = event.executionName.get
       event.executionFailure match {
         case Some(ex) =>
-          listener.onFailure(funcName, event.qe, ex)
+          val exception = ex match {
+            case e: Exception => e
+            case other: Throwable =>
+              val message = "Hit an error when executing a query" +
+                (if (other.getMessage == null) "" else s": 
${other.getMessage}")
+              new QueryExecutionException(message, other)
+          }
+          listener.onFailure(funcName, event.qe, exception)
         case _ =>
           listener.onSuccess(funcName, event.qe, event.duration)
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index cd15708..ac2ebd8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -71,7 +71,7 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
         plan = qe.analyzed
 
       }
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
     }
     spark.listenerManager.register(listener)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
index 31957a9..003f5bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala
@@ -135,7 +135,7 @@ class SessionStateSuite extends SparkFunSuite {
   test("fork new session and inherit listener manager") {
     class CommandCollector extends QueryExecutionListener {
       val commands: ArrayBuffer[String] = ArrayBuffer.empty[String]
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable) : Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, ex: 
Exception) : Unit = {}
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
         commands += funcName
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala
index fd6bc96..d2a6358 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala
@@ -28,7 +28,7 @@ class TestQueryExecutionListener extends 
QueryExecutionListener {
     OnSuccessCall.isOnSuccessCalled.set(true)
   }
 
-  override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = { }
+  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = { }
 }
 
 /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index cbe2e91..e0857ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -341,7 +341,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
       withTempPath { path =>
         var numTotalCachedHit = 0
         val listener = new QueryExecutionListener {
-          override def onFailure(f: String, qe: QueryExecution, e: Throwable): 
Unit = {}
+          override def onFailure(f: String, qe: QueryExecution, e: Exception): 
Unit = {}
 
           override def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
             qe.withCachedData match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 0a6897b..7c7afa9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -141,7 +141,7 @@ class DataSourceV2DataFrameSuite
       override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
         plan = qe.analyzed
       }
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
     }
 
     try {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
index b0da2eb..51d7342 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala
@@ -157,13 +157,13 @@ class FileDataSourceV2FallBackSuite extends QueryTest 
with SharedSparkSession {
     Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { 
format =>
       withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
         val commands = ArrayBuffer.empty[(String, LogicalPlan)]
-        val errors = ArrayBuffer.empty[(String, Throwable)]
+        val exceptions = ArrayBuffer.empty[(String, Exception)]
         val listener = new QueryExecutionListener {
           override def onFailure(
               funcName: String,
               qe: QueryExecution,
-              error: Throwable): Unit = {
-            errors += funcName -> error
+              exception: Exception): Unit = {
+            exceptions += funcName -> exception
           }
 
           override def onSuccess(funcName: String, qe: QueryExecution, 
duration: Long): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
index 9693a10..550bec7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala
@@ -216,7 +216,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with 
SharedSparkSession with
       override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
         plan = qe.analyzed
       }
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
     }
 
     spark.listenerManager.register(listener)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index fb93900..9747840 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -282,7 +282,7 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
         plan = qe.analyzed
 
       }
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
     }
 
     spark.listenerManager.register(listener)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
index 6881812..b17c935 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala
@@ -20,15 +20,17 @@ package org.apache.spark.sql.util
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark._
-import org.apache.spark.sql.{functions, AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{functions, AnalysisException, Dataset, QueryTest, 
Row, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, 
InsertIntoStatement, LogicalPlan, Project}
-import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{QueryExecution, 
QueryExecutionException, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.execution.datasources.{CreateTable, 
InsertIntoHadoopFsRelationCommand}
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.StringType
 
 class DataFrameCallbackSuite extends QueryTest
   with SharedSparkSession
@@ -40,7 +42,7 @@ class DataFrameCallbackSuite extends QueryTest
     val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)]
     val listener = new QueryExecutionListener {
       // Only test successful case here, so no need to implement `onFailure`
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
 
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
         metrics += ((funcName, qe, duration))
@@ -67,10 +69,10 @@ class DataFrameCallbackSuite extends QueryTest
   }
 
   testQuietly("execute callback functions when a DataFrame action failed") {
-    val metrics = ArrayBuffer.empty[(String, QueryExecution, Throwable)]
+    val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
     val listener = new QueryExecutionListener {
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {
-        metrics += ((funcName, qe, error))
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+        metrics += ((funcName, qe, exception))
       }
 
       // Only test failed case here, so no need to implement `onSuccess`
@@ -96,7 +98,7 @@ class DataFrameCallbackSuite extends QueryTest
     val metrics = ArrayBuffer.empty[Long]
     val listener = new QueryExecutionListener {
       // Only test successful case here, so no need to implement `onFailure`
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
 
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
         val metric = stripAQEPlan(qe.executedPlan) match {
@@ -136,7 +138,7 @@ class DataFrameCallbackSuite extends QueryTest
     val metrics = ArrayBuffer.empty[Long]
     val listener = new QueryExecutionListener {
       // Only test successful case here, so no need to implement `onFailure`
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
 
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
         metrics += qe.executedPlan.longMetric("dataSize").value
@@ -176,10 +178,10 @@ class DataFrameCallbackSuite extends QueryTest
 
   test("execute callback functions for DataFrameWriter") {
     val commands = ArrayBuffer.empty[(String, LogicalPlan)]
-    val errors = ArrayBuffer.empty[(String, Throwable)]
+    val exceptions = ArrayBuffer.empty[(String, Exception)]
     val listener = new QueryExecutionListener {
-      override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {
-        errors += funcName -> error
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+        exceptions += funcName -> exception
       }
 
       override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
@@ -225,9 +227,9 @@ class DataFrameCallbackSuite extends QueryTest
         spark.range(10).select($"id", $"id").write.insertInto("tab")
       }
       sparkContext.listenerBus.waitUntilEmpty()
-      assert(errors.length == 1)
-      assert(errors.head._1 == "insertInto")
-      assert(errors.head._2 == e)
+      assert(exceptions.length == 1)
+      assert(exceptions.head._1 == "insertInto")
+      assert(exceptions.head._2 == e)
     }
   }
 
@@ -238,7 +240,7 @@ class DataFrameCallbackSuite extends QueryTest
         metricMaps += qe.observedMetrics
       }
 
-      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Throwable): Unit = {
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
         // No-op
       }
     }
@@ -278,4 +280,32 @@ class DataFrameCallbackSuite extends QueryTest
       spark.listenerManager.unregister(listener)
     }
   }
+
+  testQuietly("SPARK-31144: QueryExecutionListener should receive 
`java.lang.Error`") {
+    var e: Exception = null
+    val listener = new QueryExecutionListener {
+      override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+        e = exception
+      }
+      override def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {}
+    }
+    spark.listenerManager.register(listener)
+
+    intercept[Error] {
+      Dataset.ofRows(spark, ErrorTestCommand("foo")).collect()
+    }
+    sparkContext.listenerBus.waitUntilEmpty()
+    assert(e != null && e.isInstanceOf[QueryExecutionException]
+      && e.getCause.isInstanceOf[Error] && e.getCause.getMessage == "foo")
+    spark.listenerManager.unregister(listener)
+  }
+}
+
+/** A test command that throws `java.lang.Error` during execution. */
+case class ErrorTestCommand(foo: String) extends RunnableCommand {
+
+  override val output: Seq[Attribute] = Seq(AttributeReference("foo", 
StringType)())
+
+  override def run(sparkSession: SparkSession): Seq[Row] =
+    throw new java.lang.Error(foo)
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
index 2fd6cb2..ab854a0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala
@@ -57,7 +57,7 @@ private class CountingQueryExecutionListener extends 
QueryExecutionListener {
     CALLBACK_COUNT.incrementAndGet()
   }
 
-  override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {
+  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
     CALLBACK_COUNT.incrementAndGet()
   }
 
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala
index d056b3b..4564c22 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.util.QueryExecutionListener
 
 class DummyQueryExecutionListener extends QueryExecutionListener {
   override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {}
-  override def onFailure(funcName: String, qe: QueryExecution, error: 
Throwable): Unit = {}
+  override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
 }
 
 class DummyStreamingQueryListener extends StreamingQueryListener {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to