This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new edb970b8a73e [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible edb970b8a73e is described below commit edb970b8a73e5b1e08b01f9370dadb05a3e231e3 Author: micheal-o <micheal.okut...@gmail.com> AuthorDate: Mon Mar 11 08:44:30 2024 +0900 [SPARK-47200][SS] Make Foreach batch sink user function error handling backward compatible ### What changes were proposed in this pull request? I checked in a previous PR (https://github.com/apache/spark/pull/45299), that handles and classifies exceptions thrown in user provided functions for foreach batch sink. This change is to make it backward compatible in order not to break current users, since users may be depending on getting the user code error from the `StreamingQueryException.cause` instead of `StreamingQueryException.cause.cause` ### Why are the changes needed? To prevent breaking existing usage pattern. ### Does this PR introduce _any_ user-facing change? Yes, better error message with error class for ForeachBatchSink user function failures. ### How was this patch tested? updated existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #45449 from micheal-o/ForeachBatchExBackwardCompat. Authored-by: micheal-o <micheal.okut...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/main/resources/error/error-classes.json | 2 +- docs/sql-error-conditions.md | 2 +- .../sql/execution/streaming/StreamExecution.scala | 29 +++++++++++++++------- .../streaming/sources/ForeachBatchSink.scala | 14 ++++++++--- .../sql/errors/QueryExecutionErrorsSuite.scala | 2 +- .../streaming/sources/ForeachBatchSinkSuite.scala | 17 +++++++------ 6 files changed, 43 insertions(+), 23 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 57746d6dbf1e..9717ff2ed49c 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1297,7 +1297,7 @@ }, "FOREACH_BATCH_USER_FUNCTION_ERROR" : { "message" : [ - "An error occurred in the user provided function in foreach batch sink." + "An error occurred in the user provided function in foreach batch sink. Reason: <reason>" ], "sqlState" : "39000" }, diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 7be01f8cb513..0be75cde968f 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -778,7 +778,7 @@ The operation `<statement>` is not allowed on the `<objectType>`: `<objectName>` [SQLSTATE: 39000](sql-error-conditions-sqlstates.html#class-39-external-routine-invocation-exception) -An error occurred in the user provided function in foreach batch sink. +An error occurred in the user provided function in foreach batch sink. Reason: `<reason>` ### FOUND_MULTIPLE_DATA_SOURCES diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 859fce8b1154..50a73082a8c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table} import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, ReadLimit, SparkDataStream} import org.apache.spark.sql.connector.write.{LogicalWriteInfoImpl, SupportsTruncate, Write} import org.apache.spark.sql.execution.command.StreamingExplainCommand +import org.apache.spark.sql.execution.streaming.sources.ForeachBatchUserFuncException import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.SupportsStreamingUpdateAsAppend import org.apache.spark.sql.streaming._ @@ -279,6 +280,7 @@ abstract class StreamExecution( * `start()` method returns. */ private def runStream(): Unit = { + var errorClassOpt: Option[String] = None try { sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, interruptOnCancel = true) @@ -330,9 +332,17 @@ abstract class StreamExecution( getLatestExecutionContext().updateStatusMessage("Stopped") case e: Throwable => val message = if (e.getMessage == null) "" else e.getMessage + val cause = if (e.isInstanceOf[ForeachBatchUserFuncException]) { + // We want to maintain the current way users get the causing exception + // from the StreamingQueryException. Hence the ForeachBatch exception is unwrapped here. + e.getCause + } else { + e + } + streamDeathCause = new StreamingQueryException( toDebugString(includeLogicalPlan = isInitialized), - cause = e, + cause = cause, getLatestExecutionContext().startOffsets .toOffsetSeq(sources.toSeq, getLatestExecutionContext().offsetSeqMetadata) .toString, @@ -350,12 +360,18 @@ abstract class StreamExecution( "endOffset" -> getLatestExecutionContext().endOffsets.toOffsetSeq( sources.toSeq, getLatestExecutionContext().offsetSeqMetadata).toString )) + + errorClassOpt = e match { + case t: SparkThrowable => Option(t.getErrorClass) + case _ => None + } + logError(s"Query $prettyIdString terminated with error", e) getLatestExecutionContext().updateStatusMessage(s"Terminated with exception: $message") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to // handle them - if (!NonFatal(e)) { - throw e + if (!NonFatal(cause)) { + throw cause } } finally queryExecutionThread.runUninterruptibly { // The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted @@ -379,12 +395,6 @@ abstract class StreamExecution( // Notify others sparkSession.streams.notifyQueryTermination(StreamExecution.this) - val errorClassOpt = exception.flatMap { - _.cause match { - case t: SparkThrowable => Some(t.getErrorClass) - case _ => None - } - } postEvent( new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString), errorClassOpt)) @@ -691,6 +701,7 @@ object StreamExecution { case e2 @ (_: UncheckedIOException | _: ExecutionException | _: UncheckedExecutionException) if e2.getCause != null => isInterruptionException(e2.getCause, sc) + case fe: ForeachBatchUserFuncException => isInterruptionException(fe.getCause, sc) case se: SparkException => val jobGroup = sc.getLocalProperty("spark.jobGroup.id") if (jobGroup == null) return false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala index 5cf98dab21eb..1262731790be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSink.scala @@ -45,14 +45,20 @@ class ForeachBatchSink[T](batchWriter: (Dataset[T], Long) => Unit, encoder: Expr } catch { // The user code can throw any type of exception. case NonFatal(e) if !e.isInstanceOf[SparkThrowable] => - throw new SparkException( - errorClass = "FOREACH_BATCH_USER_FUNCTION_ERROR", - messageParameters = Map.empty, - cause = e) + throw ForeachBatchUserFuncException(e) } } } +/** + * Exception that wraps the exception thrown in the user provided function in ForeachBatch sink. + */ +private[streaming] case class ForeachBatchUserFuncException(cause: Throwable) + extends SparkException( + errorClass = "FOREACH_BATCH_USER_FUNCTION_ERROR", + messageParameters = Map("reason" -> Option(cause.getMessage).getOrElse("")), + cause = cause) + /** * Interface that is meant to be extended by Python classes via Py4J. * Py4J allows Python classes to implement Java interfaces so that the JVM can call back diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala index 9f7224315b0e..d4697773742f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala @@ -889,7 +889,7 @@ class QueryExecutionErrorsSuite query.awaitTermination() } assert(e.getErrorClass === "STREAM_FAILED") - assert(e.getCause.getCause.isInstanceOf[NullPointerException]) + assert(e.getCause.isInstanceOf[NullPointerException]) } test("CONCURRENT_QUERY: streaming query is resumed from many sessions") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala index 7d04d14a17ab..5304ea3b69dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming.sources import scala.collection.mutable import scala.language.implicitConversions -import org.apache.spark.{ExecutorDeadException, SparkException} +import org.apache.spark.ExecutorDeadException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.execution.SerializeFromObjectExec @@ -193,23 +193,26 @@ class ForeachBatchSinkSuite extends StreamTest { mem.addData(1, 2, 3, 4, 5) val funcEx = new IllegalAccessException("access error") - val wrapperEx = intercept[StreamingQueryException] { + val queryEx = intercept[StreamingQueryException] { val query = ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => throw funcEx).start() query.awaitTermination() - }.getCause + } + + val errClass = "FOREACH_BATCH_USER_FUNCTION_ERROR" // verify that we classified the exception - checkError(wrapperEx.asInstanceOf[SparkException], "FOREACH_BATCH_USER_FUNCTION_ERROR") - assert(wrapperEx.getCause == funcEx) + assert(queryEx.getMessage.contains(errClass)) + assert(queryEx.getCause == funcEx) val sparkEx = ExecutorDeadException("network error") val ex = intercept[StreamingQueryException] { val query = ds.writeStream.foreachBatch((_: Dataset[Int], _: Long) => throw sparkEx).start() query.awaitTermination() - }.getCause + } // we didn't wrap the spark exception - assert(ex == sparkEx) + assert(!ex.getMessage.contains(errClass)) + assert(ex.getCause == sparkEx) } // ============== Helper classes and methods ================= --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org