This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 94f3e4113ef [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors 94f3e4113ef is described below commit 94f3e4113ef6fbf0940578bcb279f233e43c27f1 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Wed Jun 8 21:20:55 2022 +0300 [SPARK-39412][SQL] Exclude IllegalStateException from Spark's internal errors ### What changes were proposed in this pull request? In the PR, I propose to exclude `IllegalStateException` from the list of exceptions that are wrapped by `SparkException` with the `INTERNAL_ERROR` error class. ### Why are the changes needed? See explanation in SPARK-39412. ### Does this PR introduce _any_ user-facing change? No, the reverted changes haven't released yet. ### How was this patch tested? By running the modified test suites: ``` $ build/sbt "test:testOnly *ContinuousSuite" $ build/sbt "test:testOnly *MicroBatchExecutionSuite" $ build/sbt "test:testOnly *KafkaMicroBatchV1SourceSuite" $ build/sbt "test:testOnly *KafkaMicroBatchV2SourceSuite" $ build/sbt "test:testOnly *.WholeStageCodegenSuite" ``` Closes #36804 from MaxGekk/exclude-IllegalStateException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> (cherry picked from commit 19afe1341d277bc2d7dd47175d142a8c71141138) Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 11 ++++------- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 7 +++---- .../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 11 ++++------- .../sql/execution/streaming/MicroBatchExecutionSuite.scala | 6 ++---- .../spark/sql/streaming/continuous/ContinuousSuite.scala | 7 +++---- 6 files changed, 17 insertions(+), 27 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 41277a535f5..db71f0fd918 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -34,7 +34,6 @@ import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.sql.{Dataset, ForeachWriter, Row, SparkSession} import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.read.streaming.SparkDataStream @@ -667,10 +666,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic2, Array("6")) }, StartStream(), - ExpectFailure[SparkException](e => { - assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") + ExpectFailure[IllegalStateException](e => { // The offset of `topic2` should be changed from 2 to 1 - assert(e.getCause.getMessage.contains("was changed from 2 to 1")) + assert(e.getMessage.contains("was changed from 2 to 1")) }) ) } @@ -766,13 +764,12 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testStream(df)( StartStream(checkpointLocation = metadataPath.getAbsolutePath), - ExpectFailure[SparkException](e => { - assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") + ExpectFailure[IllegalStateException](e => { Seq( s"maximum supported log version is v1, but encountered v99999", "produced by a newer version of Spark and cannot be read by this version" ).foreach { message => - assert(e.getCause.toString.contains(message)) + assert(e.toString.contains(message)) } })) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a4a40cc0e69..6ef9bc2a703 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3848,7 +3848,7 @@ class Dataset[T] private[sql]( /** * Wrap a Dataset action to track the QueryExecution and time cost, then report to the - * user-registered callback functions, and also to convert asserts/illegal states to + * user-registered callback functions, and also to convert asserts/NPE to * the internal error exception. */ private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index ab9b9861c03..840bd436266 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -489,11 +489,10 @@ object QueryExecution { } /** - * Converts asserts, null pointer, illegal state exceptions to internal errors. + * Converts asserts, null pointer exceptions to internal errors. */ private[sql] def toInternalError(msg: String, e: Throwable): Throwable = e match { - case e @ (_: java.lang.IllegalStateException | _: java.lang.NullPointerException | - _: java.lang.AssertionError) => + case e @ (_: java.lang.NullPointerException | _: java.lang.AssertionError) => new SparkException( errorClass = "INTERNAL_ERROR", messageParameters = Array(msg + @@ -504,7 +503,7 @@ object QueryExecution { } /** - * Catches asserts, null pointer, illegal state exceptions, and converts them to internal errors. + * Catches asserts, null pointer exceptions, and converts them to internal errors. */ private[sql] def withInternalError[T](msg: String)(block: => T): T = { try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 27689bb4d45..2be915f0002 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import org.apache.spark.SparkException import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats, CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite @@ -763,11 +762,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession "SELECT AVG(v) FROM VALUES(1) t(v)", // Tet case with keys "SELECT k, AVG(v) FROM VALUES((1, 1)) t(k, v) GROUP BY k").foreach { query => - val e = intercept[SparkException] { + val e = intercept[IllegalStateException] { sql(query).collect } - assert(e.getErrorClass === "INTERNAL_ERROR") - assert(e.getCause.getMessage.contains(expectedErrMsg)) + assert(e.getMessage.contains(expectedErrMsg)) } } } @@ -786,11 +784,10 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession // Tet case with keys "SELECT k, AVG(a + b), SUM(a + b + c) FROM VALUES((1, 1, 1, 1)) t(k, a, b, c) " + "GROUP BY k").foreach { query => - val e = intercept[SparkException] { + val e = intercept[IllegalStateException] { sql(query).collect } - assert(e.getErrorClass === "INTERNAL_ERROR") - assert(e.getCause.getMessage.contains(expectedErrMsg)) + assert(e.getMessage.contains(expectedErrMsg)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index 9d731248ad4..f06e62b33b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -22,7 +22,6 @@ import java.io.File import org.apache.commons.io.FileUtils import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkException, SparkThrowable} import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.connector.read.streaming @@ -94,9 +93,8 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { testStream(streamEvent) ( AddData(inputData, 1, 2, 3, 4, 5, 6), StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), - ExpectFailure[SparkException] { e => - assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") - assert(e.getCause.getMessage.contains("batch 3 doesn't exist")) + ExpectFailure[IllegalStateException] { e => + assert(e.getMessage.contains("batch 3 doesn't exist")) } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index a28d44caab0..5893c3da098 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.streaming.continuous import java.sql.Timestamp -import org.apache.spark.{SparkContext, SparkException, SparkThrowable} +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart} import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ @@ -440,9 +440,8 @@ class ContinuousEpochBacklogSuite extends ContinuousSuiteBase { testStream(df)( StartStream(Trigger.Continuous(1)), - ExpectFailure[SparkException] { e => - assert(e.asInstanceOf[SparkThrowable].getErrorClass === "INTERNAL_ERROR") - e.getCause.getMessage.contains("queue has exceeded its maximum") + ExpectFailure[IllegalStateException] { e => + e.getMessage.contains("queue has exceeded its maximum") } ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org