This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b3398e695d92 [SPARK-46381][SQL] Migrate sub-classes of `AnalysisException` to error classes b3398e695d92 is described below commit b3398e695d929c7f867d408c28fb274509c9854c Author: Max Gekk <max.g...@gmail.com> AuthorDate: Wed Dec 13 12:28:31 2023 +0300 [SPARK-46381][SQL] Migrate sub-classes of `AnalysisException` to error classes ### What changes were proposed in this pull request? In the PR, I propose to migrate the rest two sub-classes of `AnalysisException` onto error classes: - NonEmptyNamespaceException - ExtendedAnalysisException and forbid raising of such exception without an error class. ### Why are the changes needed? This is a part of the migration on the error framework, and unifying errors in Spark. ### Does this PR introduce _any_ user-facing change? Yes, if user's code depends on the format of error messages. ### How was this patch tested? By existing test suites like: ``` $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44314 from MaxGekk/error-class-ExtendedAnalysisException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../utils/src/main/resources/error/error-classes.json | 15 +++++++++++++++ .../sql/catalyst/analysis/NonEmptyException.scala | 18 ++++++++++++------ .../spark/sql/catalyst/ExtendedAnalysisException.scala | 2 +- .../sql/catalyst/analysis/ResolveTimeWindows.scala | 7 +++++-- .../analysis/UnsupportedOperationChecker.scala | 5 ++++- .../spark/sql/errors/QueryCompilationErrors.scala | 9 +++------ .../scala/org/apache/spark/sql/jdbc/DB2Dialect.scala | 2 +- .../org/apache/spark/sql/jdbc/MsSqlServerDialect.scala | 2 +- .../org/apache/spark/sql/jdbc/PostgresDialect.scala | 2 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 ++++++-- 10 files changed, 49 insertions(+), 21 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index d52ffc011b72..2aa5420eb22c 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -6958,6 +6958,21 @@ "<message>" ] }, + "_LEGACY_ERROR_TEMP_3101" : { + "message" : [ + "The input is not a correct window column: <windowTime>" + ] + }, + "_LEGACY_ERROR_TEMP_3102" : { + "message" : [ + "<msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3103" : { + "message" : [ + "Namespace '<namespace>' is non empty. <details>" + ] + }, "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : { "message" : [ "<errorMessage>" diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala index 2aea9bac12fe..6475ac3093fe 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala @@ -25,12 +25,18 @@ import org.apache.spark.sql.catalyst.util.QuotingUtils.quoted * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ -case class NonEmptyNamespaceException private( - override val message: String, +case class NonEmptyNamespaceException( + namespace: Array[String], + details: String, override val cause: Option[Throwable] = None) - extends AnalysisException(message, cause = cause) { + extends AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3103", + messageParameters = Map( + "namespace" -> quoted(namespace), + "details" -> details)) { - def this(namespace: Array[String]) = { - this(s"Namespace '${quoted(namespace)}' is non empty.") - } + def this(namespace: Array[String]) = this(namespace, "", None) + + def this(details: String, cause: Option[Throwable]) = + this(Array.empty, details, cause) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala index 2eb7054edceb..1565935a8739 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ExtendedAnalysisException.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** * Internal [[AnalysisException]] that also captures a [[LogicalPlan]]. */ -class ExtendedAnalysisException( +class ExtendedAnalysisException private( message: String, line: Option[Int] = None, startPosition: Option[Int] = None, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala index 1ee218f9369c..a6688f276621 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTimeWindows.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ExtendedAnalysisException import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime} import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project} @@ -309,9 +310,11 @@ object ResolveWindowTime extends Rule[LogicalPlan] { if (!metadata.contains(TimeWindow.marker) && !metadata.contains(SessionWindow.marker)) { - // FIXME: error framework? throw new ExtendedAnalysisException( - s"The input is not a correct window column: $windowTime", plan = Some(p)) + new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3101", + messageParameters = Map("windowTime" -> windowTime.toString)), + plan = p) } val newMetadata = new MetadataBuilder() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 0394a697c12f..68f93a1f2113 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -552,7 +552,10 @@ object UnsupportedOperationChecker extends Logging { private def throwError(msg: String)(implicit operator: LogicalPlan): Nothing = { throw new ExtendedAnalysisException( - msg, operator.origin.line, operator.origin.startPosition, Some(operator)) + new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3102", + messageParameters = Map("msg" -> msg)), + plan = operator) } private def checkForStreamStreamJoinWatermark(join: Join): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 1195e9dd78da..c1ee9b49d8de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -21,7 +21,7 @@ import scala.collection.mutable import org.apache.hadoop.fs.Path -import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkThrowableHelper, SparkUnsupportedOperationException} +import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkUnsupportedOperationException} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex} @@ -1955,12 +1955,9 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat } def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan: LogicalPlan): Throwable = { - val errorClass = "_LEGACY_ERROR_TEMP_1181" new ExtendedAnalysisException( - SparkThrowableHelper.getMessage(errorClass, Map.empty[String, String]), - errorClass = Some(errorClass), - messageParameters = Map.empty, - plan = Some(plan)) + new AnalysisException(errorClass = "_LEGACY_ERROR_TEMP_1181", messageParameters = Map.empty), + plan = plan) } def invalidPandasUDFPlacementError( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala index 189dedb60f0c..8975a015ee8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DB2Dialect.scala @@ -149,7 +149,7 @@ private object DB2Dialect extends JdbcDialect { case sqlException: SQLException => sqlException.getSQLState match { // https://www.ibm.com/docs/en/db2/11.5?topic=messages-sqlstate - case "42893" => throw NonEmptyNamespaceException(message, cause = Some(e)) + case "42893" => throw new NonEmptyNamespaceException(message, cause = Some(e)) case _ => super.classifyException(message, e) } case _ => super.classifyException(message, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala index 361da645ee4c..ee649122ca80 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala @@ -194,7 +194,7 @@ private object MsSqlServerDialect extends JdbcDialect { e match { case sqlException: SQLException => sqlException.getErrorCode match { - case 3729 => throw NonEmptyNamespaceException(message, cause = Some(e)) + case 3729 => throw new NonEmptyNamespaceException(message, cause = Some(e)) case _ => super.classifyException(message, e) } case _ => super.classifyException(message, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index e3af80743272..cff7bb5e06f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -254,7 +254,7 @@ private object PostgresDialect extends JdbcDialect with SQLConfHelper { val indexName = regex.findFirstMatchIn(message).get.group(1) val tableName = regex.findFirstMatchIn(message).get.group(2) throw new NoSuchIndexException(indexName, tableName, cause = Some(e)) - case "2BP01" => throw NonEmptyNamespaceException(message, cause = Some(e)) + case "2BP01" => throw new NonEmptyNamespaceException(message, cause = Some(e)) case _ => super.classifyException(message, e) } case unsupported: UnsupportedOperationException => throw unsupported diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 15cbd69e62e0..b603c95fb30d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2638,10 +2638,14 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark test("SPARK-20164: ExtendedAnalysisException should be tolerant to null query plan") { try { - throw new ExtendedAnalysisException("", None, None, plan = null) + throw new ExtendedAnalysisException( + new AnalysisException( + errorClass = "_LEGACY_ERROR_USER_RAISED_EXCEPTION", + messageParameters = Map("errorMessage" -> "null query plan")), + plan = null) } catch { case ae: ExtendedAnalysisException => - assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage) + assert(ae.plan == None && ae.getMessage == ae.getSimpleMessage) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org