This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 339e4dd [SPARK-31144][SQL] Wrap Error with QueryExecutionException to notify QueryExecutionListener 339e4dd is described below commit 339e4dd3a3daf6c11670e5ca7786c54f68a86bfa Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Fri Mar 13 15:55:29 2020 -0700 [SPARK-31144][SQL] Wrap Error with QueryExecutionException to notify QueryExecutionListener ### What changes were proposed in this pull request? This PR manually reverts changes in #25292 and then wraps java.lang.Error with `QueryExecutionException` to notify `QueryExecutionListener` to send it to `QueryExecutionListener.onFailure` which only accepts `Exception`. The bug fix PR for 2.4 is #27904. It needs a separate PR because the touched codes were changed a lot. ### Why are the changes needed? Avoid API changes and fix a bug. ### Does this PR introduce any user-facing change? Yes. Reverting an API change happening in 3.0. QueryExecutionListener APIs will be the same as 2.4. ### How was this patch tested? The new added test. Closes #27907 from zsxwing/SPARK-31144. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 1ddf44dfcaff53e870a3c9608e31a60805e50c29) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- project/MimaExcludes.scala | 4 -- .../spark/sql/util/QueryExecutionListener.scala | 18 +++++-- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 2 +- .../org/apache/spark/sql/SessionStateSuite.scala | 2 +- .../spark/sql/TestQueryExecutionListener.scala | 2 +- .../test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +- .../sql/connector/DataSourceV2DataFrameSuite.scala | 2 +- .../connector/FileDataSourceV2FallBackSuite.scala | 6 +-- .../connector/SupportsCatalogOptionsSuite.scala | 2 +- .../sql/test/DataFrameReaderWriterSuite.scala | 2 +- .../spark/sql/util/DataFrameCallbackSuite.scala | 62 ++++++++++++++++------ .../sql/util/ExecutionListenerManagerSuite.scala | 2 +- .../sql/hive/thriftserver/DummyListeners.scala | 2 +- 13 files changed, 71 insertions(+), 37 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7f66577..f8ad60b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -419,10 +419,6 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$"), - // [SPARK-28556][SQL] QueryExecutionListener should also notify Error - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), - // [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0 ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala index 01f8182..0b5951e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException} import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.util.{ListenerBus, Utils} @@ -55,12 +55,13 @@ trait QueryExecutionListener { * @param funcName the name of the action that triggered this query. * @param qe the QueryExecution object that carries detail information like logical plan, * physical plan, etc. - * @param error the error that failed this query. - * + * @param exception the exception that failed this query. If `java.lang.Error` is thrown during + * execution, it will be wrapped with an `Exception` and it can be accessed by + * `exception.getCause`. * @note This can be invoked by multiple different threads. */ @DeveloperApi - def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit + def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit } @@ -140,7 +141,14 @@ private[sql] class ExecutionListenerBus(session: SparkSession) val funcName = event.executionName.get event.executionFailure match { case Some(ex) => - listener.onFailure(funcName, event.qe, ex) + val exception = ex match { + case e: Exception => e + case other: Throwable => + val message = "Hit an error when executing a query" + + (if (other.getMessage == null) "" else s": ${other.getMessage}") + new QueryExecutionException(message, other) + } + listener.onFailure(funcName, event.qe, exception) case _ => listener.onSuccess(funcName, event.qe, event.duration) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index cd15708..ac2ebd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -71,7 +71,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala index 31957a9..003f5bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SessionStateSuite.scala @@ -135,7 +135,7 @@ class SessionStateSuite extends SparkFunSuite { test("fork new session and inherit listener manager") { class CommandCollector extends QueryExecutionListener { val commands: ArrayBuffer[String] = ArrayBuffer.empty[String] - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable) : Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, ex: Exception) : Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { commands += funcName } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala index fd6bc96..d2a6358 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestQueryExecutionListener.scala @@ -28,7 +28,7 @@ class TestQueryExecutionListener extends QueryExecutionListener { OnSuccessCall.isOnSuccessCalled.set(true) } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { } + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index cbe2e91..e0857ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -341,7 +341,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { withTempPath { path => var numTotalCachedHit = 0 val listener = new QueryExecutionListener { - override def onFailure(f: String, qe: QueryExecution, e: Throwable): Unit = {} + override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { qe.withCachedData match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 0a6897b..7c7afa9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -141,7 +141,7 @@ class DataSourceV2DataFrameSuite override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index b0da2eb..51d7342 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -157,13 +157,13 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) { val commands = ArrayBuffer.empty[(String, LogicalPlan)] - val errors = ArrayBuffer.empty[(String, Throwable)] + val exceptions = ArrayBuffer.empty[(String, Exception)] val listener = new QueryExecutionListener { override def onFailure( funcName: String, qe: QueryExecution, - error: Throwable): Unit = { - errors += funcName -> error + exception: Exception): Unit = { + exceptions += funcName -> exception } override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala index 9693a10..550bec7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/SupportsCatalogOptionsSuite.scala @@ -216,7 +216,7 @@ class SupportsCatalogOptionsSuite extends QueryTest with SharedSparkSession with override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index fb93900..9747840 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -282,7 +282,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with plan = qe.analyzed } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index 6881812..b17c935 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -20,15 +20,17 @@ package org.apache.spark.sql.util import scala.collection.mutable.ArrayBuffer import org.apache.spark._ -import org.apache.spark.sql.{functions, AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{functions, AnalysisException, Dataset, QueryTest, Row, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, InsertIntoStatement, LogicalPlan, Project} -import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.{CreateTable, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.json.JsonFileFormat -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StringType class DataFrameCallbackSuite extends QueryTest with SharedSparkSession @@ -40,7 +42,7 @@ class DataFrameCallbackSuite extends QueryTest val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metrics += ((funcName, qe, duration)) @@ -67,10 +69,10 @@ class DataFrameCallbackSuite extends QueryTest } testQuietly("execute callback functions when a DataFrame action failed") { - val metrics = ArrayBuffer.empty[(String, QueryExecution, Throwable)] + val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { - metrics += ((funcName, qe, error)) + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + metrics += ((funcName, qe, exception)) } // Only test failed case here, so no need to implement `onSuccess` @@ -96,7 +98,7 @@ class DataFrameCallbackSuite extends QueryTest val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { val metric = stripAQEPlan(qe.executedPlan) match { @@ -136,7 +138,7 @@ class DataFrameCallbackSuite extends QueryTest val metrics = ArrayBuffer.empty[Long] val listener = new QueryExecutionListener { // Only test successful case here, so no need to implement `onFailure` - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { metrics += qe.executedPlan.longMetric("dataSize").value @@ -176,10 +178,10 @@ class DataFrameCallbackSuite extends QueryTest test("execute callback functions for DataFrameWriter") { val commands = ArrayBuffer.empty[(String, LogicalPlan)] - val errors = ArrayBuffer.empty[(String, Throwable)] + val exceptions = ArrayBuffer.empty[(String, Exception)] val listener = new QueryExecutionListener { - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { - errors += funcName -> error + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + exceptions += funcName -> exception } override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { @@ -225,9 +227,9 @@ class DataFrameCallbackSuite extends QueryTest spark.range(10).select($"id", $"id").write.insertInto("tab") } sparkContext.listenerBus.waitUntilEmpty() - assert(errors.length == 1) - assert(errors.head._1 == "insertInto") - assert(errors.head._2 == e) + assert(exceptions.length == 1) + assert(exceptions.head._1 == "insertInto") + assert(exceptions.head._2 == e) } } @@ -238,7 +240,7 @@ class DataFrameCallbackSuite extends QueryTest metricMaps += qe.observedMetrics } - override def onFailure(funcName: String, qe: QueryExecution, exception: Throwable): Unit = { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { // No-op } } @@ -278,4 +280,32 @@ class DataFrameCallbackSuite extends QueryTest spark.listenerManager.unregister(listener) } } + + testQuietly("SPARK-31144: QueryExecutionListener should receive `java.lang.Error`") { + var e: Exception = null + val listener = new QueryExecutionListener { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + e = exception + } + override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {} + } + spark.listenerManager.register(listener) + + intercept[Error] { + Dataset.ofRows(spark, ErrorTestCommand("foo")).collect() + } + sparkContext.listenerBus.waitUntilEmpty() + assert(e != null && e.isInstanceOf[QueryExecutionException] + && e.getCause.isInstanceOf[Error] && e.getCause.getMessage == "foo") + spark.listenerManager.unregister(listener) + } +} + +/** A test command that throws `java.lang.Error` during execution. */ +case class ErrorTestCommand(foo: String) extends RunnableCommand { + + override val output: Seq[Attribute] = Seq(AttributeReference("foo", StringType)()) + + override def run(sparkSession: SparkSession): Seq[Row] = + throw new java.lang.Error(foo) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala index 2fd6cb2..ab854a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ExecutionListenerManagerSuite.scala @@ -57,7 +57,7 @@ private class CountingQueryExecutionListener extends QueryExecutionListener { CALLBACK_COUNT.incrementAndGet() } - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { CALLBACK_COUNT.incrementAndGet() } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala index d056b3b..4564c22 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/DummyListeners.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.util.QueryExecutionListener class DummyQueryExecutionListener extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {} - override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {} + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } class DummyStreamingQueryListener extends StreamingQueryListener { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org