This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1c81ad2 [SPARK-35064][SQL] Group error in spark-catalyst 1c81ad2 is described below commit 1c81ad20296d34f137238dadd67cc6ae405944eb Author: dgd-contributor <dgd_contribu...@viettel.com.vn> AuthorDate: Mon Jun 28 07:21:24 2021 +0000 [SPARK-35064][SQL] Group error in spark-catalyst ### What changes were proposed in this pull request? This PR group exception messages in sql/catalyst/src/main/scala/org/apache/spark/sql (except catalyst) ### Why are the changes needed? It will largely help with standardization of error messages and its maintenance. ### Does this PR introduce any user-facing change? No. Error messages remain unchanged. ### How was this patch tested? No new tests - pass all original tests to make sure it doesn't break any existing behavior. Closes #32916 from dgd-contributor/SPARK-35064_catalyst_group_error. Authored-by: dgd-contributor <dgd_contribu...@viettel.com.vn> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../main/scala/org/apache/spark/sql/Encoders.scala | 7 +- .../src/main/scala/org/apache/spark/sql/Row.scala | 5 +- .../sql/connector/catalog/CatalogManager.scala | 6 +- .../spark/sql/connector/catalog/Catalogs.scala | 26 +++-- .../spark/sql/errors/QueryCompilationErrors.scala | 49 ++++++++- .../spark/sql/errors/QueryExecutionErrors.scala | 112 ++++++++++++++++++++- .../apache/spark/sql/execution/RowIterator.scala | 5 +- .../spark/sql/internal/ReadOnlySQLConf.scala | 13 +-- .../org/apache/spark/sql/internal/SQLConf.scala | 9 +- .../apache/spark/sql/types/AbstractDataType.scala | 4 +- .../org/apache/spark/sql/types/DataType.scala | 6 +- .../org/apache/spark/sql/types/DecimalType.scala | 13 +-- .../org/apache/spark/sql/types/ObjectType.scala | 5 +- .../apache/spark/sql/types/UDTRegistration.scala | 9 +- .../org/apache/spark/sql/util/ArrowUtils.scala | 8 +- .../apache/spark/sql/util/PartitioningUtils.scala | 10 +- .../org/apache/spark/sql/util/SchemaUtils.scala | 5 +- .../spark/sql/execution/arrow/ArrowWriter.scala | 2 +- 18 files changed, 219 insertions(+), 75 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala index 4ead950..15a93a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoders.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.catalyst.expressions.{BoundReference, Cast} import org.apache.spark.sql.catalyst.expressions.objects.{DecodeUsingSerializer, EncodeUsingSerializer} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ /** @@ -224,16 +225,14 @@ object Encoders { /** Throws an exception if T is not a public class. */ private def validatePublicClass[T: ClassTag](): Unit = { if (!Modifier.isPublic(classTag[T].runtimeClass.getModifiers)) { - throw new UnsupportedOperationException( - s"${classTag[T].runtimeClass.getName} is not a public class. " + - "Only public classes are supported.") + throw QueryExecutionErrors.notPublicClassError(classTag[T].runtimeClass.getName) } } /** A way to construct encoders using generic serializers. */ private def genericSerializer[T: ClassTag](useKryo: Boolean): Encoder[T] = { if (classTag[T].runtimeClass.isPrimitive) { - throw new UnsupportedOperationException("Primitive types are not supported.") + throw QueryExecutionErrors.primitiveTypesNotSupportedError() } validatePublicClass[T]() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 558fddb..efc11ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -33,6 +33,7 @@ import org.apache.spark.annotation.{Stable, Unstable} import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.CalendarInterval @@ -375,7 +376,7 @@ trait Row extends Serializable { * @throws IllegalArgumentException when a field `name` does not exist. */ def fieldIndex(name: String): Int = { - throw new UnsupportedOperationException("fieldIndex on a Row without schema is undefined.") + throw QueryExecutionErrors.fieldIndexOnRowWithoutSchemaError() } /** @@ -520,7 +521,7 @@ trait Row extends Serializable { * @throws NullPointerException when value is null. */ private def getAnyValAs[T <: AnyVal](i: Int): T = - if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null") + if (isNullAt(i)) throw QueryExecutionErrors.valueIsNullError(i) else getAs[T](i) /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 0779bf5..ff0ad23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -21,8 +21,8 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf /** @@ -103,9 +103,9 @@ class CatalogManager( case _ if isSessionCatalog(currentCatalog) && namespace.length == 1 => v1SessionCatalog.setCurrentDatabase(namespace.head) case _ if isSessionCatalog(currentCatalog) => - throw new NoSuchNamespaceException(namespace) + throw QueryCompilationErrors.noSuchNamespaceError(namespace) case catalog: SupportsNamespaces if !catalog.namespaceExists(namespace) => - throw new NoSuchNamespaceException(namespace) + throw QueryCompilationErrors.noSuchNamespaceError(namespace) case _ => _currentNamespace = Some(namespace) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala index cc4395e..9949f45 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/Catalogs.scala @@ -23,6 +23,7 @@ import java.util.NoSuchElementException import java.util.regex.Pattern import org.apache.spark.SparkException +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils @@ -47,35 +48,32 @@ private[sql] object Catalogs { conf.getConfString("spark.sql.catalog." + name) } catch { case _: NoSuchElementException => - throw new CatalogNotFoundException( - s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined") + throw QueryExecutionErrors.catalogPluginClassNotFoundError(name) } val loader = Utils.getContextOrSparkClassLoader try { val pluginClass = loader.loadClass(pluginClassName) if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) { - throw new SparkException( - s"Plugin class for catalog '$name' does not implement CatalogPlugin: $pluginClassName") + throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName) } val plugin = pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin] plugin.initialize(name, catalogOptions(name, conf)) plugin } catch { case _: ClassNotFoundException => - throw new SparkException( - s"Cannot find catalog plugin class for catalog '$name': $pluginClassName") + throw QueryExecutionErrors.catalogPluginClassNotFoundForCatalogError(name, pluginClassName) case e: NoSuchMethodException => - throw new SparkException( - s"Failed to find public no-arg constructor for catalog '$name': $pluginClassName)", e) + throw QueryExecutionErrors.catalogFailToFindPublicNoArgConstructorError( + name, pluginClassName, e) case e: IllegalAccessException => - throw new SparkException( - s"Failed to call public no-arg constructor for catalog '$name': $pluginClassName)", e) + throw QueryExecutionErrors.catalogFailToCallPublicNoArgConstructorError( + name, pluginClassName, e) case e: InstantiationException => - throw new SparkException("Cannot instantiate abstract catalog plugin class for " + - s"catalog '$name': $pluginClassName", e.getCause) + throw QueryExecutionErrors.cannotInstantiateAbstractCatalogPluginClassError( + name, pluginClassName, e) case e: InvocationTargetException => - throw new SparkException("Failed during instantiating constructor for catalog " + - s"'$name': $pluginClassName", e.getCause) + throw QueryExecutionErrors.failedToInstantiateConstructorForCatalogError( + name, pluginClassName, e) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 6559716..bce51ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.functions.{BoundFunction, UnboundFunction} import org.apache.spark.sql.connector.expressions.{NamedReference, Transform} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_POLICY +import org.apache.spark.sql.internal.SQLConf.{LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED, LEGACY_CTE_PRECEDENCE_POLICY} import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types._ @@ -1647,4 +1647,51 @@ private[spark] object QueryCompilationErrors { def invalidYearMonthIntervalType(startFieldName: String, endFieldName: String): Throwable = { new AnalysisException(s"'interval $startFieldName to $endFieldName' is invalid.") } + + def configRemovedInVersionError( + configName: String, + version: String, + comment: String): Throwable = { + new AnalysisException( + s"The SQL config '$configName' was removed in the version $version. $comment") + } + + def failedFallbackParsingError(msg: String, e1: Throwable, e2: Throwable): Throwable = { + new AnalysisException(s"$msg${e1.getMessage}\nFailed fallback parsing: ${e2.getMessage}", + cause = Some(e1.getCause)) + } + + def decimalCannotGreaterThanPrecisionError(scale: Int, precision: Int): Throwable = { + new AnalysisException(s"Decimal scale ($scale) cannot be greater than precision ($precision).") + } + + def decimalOnlySupportPrecisionUptoError(decimalType: String, precision: Int): Throwable = { + new AnalysisException(s"$decimalType can only support precision up to $precision") + } + + def negativeScaleNotAllowedError(scale: Int): Throwable = { + new AnalysisException( + s"""|Negative scale is not allowed: $scale. + |You can use ${LEGACY_ALLOW_NEGATIVE_SCALE_OF_DECIMAL_ENABLED.key}=true + |to enable legacy mode to allow it.""".stripMargin.replaceAll("\n", " ")) + } + + def invalidPartitionColumnKeyInTableError(key: String, tblName: String): Throwable = { + new AnalysisException(s"$key is not a valid partition column in table $tblName.") + } + + def invalidPartitionSpecError( + specKeys: String, + partitionColumnNames: Seq[String], + tableName: String): Throwable = { + new AnalysisException( + s"""|Partition spec is invalid. The spec ($specKeys) must match + |the partition spec (${partitionColumnNames.mkString(", ")}) defined in + |table '$tableName'""".stripMargin.replaceAll("\n", " ")) + } + + def foundDuplicateColumnError(colType: String, duplicateCol: Seq[String]): Throwable = { + new AnalysisException( + s"Found duplicate column(s) $colType: ${duplicateCol.sorted.mkString(", ")}") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 8fa41fd..4666d3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{DomainJoin, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ValueInterval import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.{sideBySide, BadRecordException, FailFastMode} +import org.apache.spark.sql.connector.catalog.CatalogNotFoundException import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.Transform @@ -752,8 +753,8 @@ object QueryExecutionErrors { new IllegalArgumentException(s"Could not compare cost with $cost") } - def unsupportedDataTypeError(dt: DataType): Throwable = { - new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}") + def unsupportedDataTypeError(dt: String): Throwable = { + new UnsupportedOperationException(s"Unsupported data type: ${dt}") } def notSupportTypeError(dataType: DataType): Throwable = { @@ -1428,4 +1429,111 @@ object QueryExecutionErrors { def invalidStreamingOutputModeError(outputMode: Option[OutputMode]): Throwable = { new UnsupportedOperationException(s"Invalid output mode: $outputMode") } + + def catalogPluginClassNotFoundError(name: String): Throwable = { + new CatalogNotFoundException( + s"Catalog '$name' plugin class not found: spark.sql.catalog.$name is not defined") + } + + def catalogPluginClassNotImplementedError(name: String, pluginClassName: String): Throwable = { + new SparkException( + s"Plugin class for catalog '$name' does not implement CatalogPlugin: $pluginClassName") + } + + def catalogPluginClassNotFoundForCatalogError( + name: String, + pluginClassName: String): Throwable = { + new SparkException(s"Cannot find catalog plugin class for catalog '$name': $pluginClassName") + } + + def catalogFailToFindPublicNoArgConstructorError( + name: String, + pluginClassName: String, + e: Exception): Throwable = { + new SparkException( + s"Failed to find public no-arg constructor for catalog '$name': $pluginClassName)", e) + } + + def catalogFailToCallPublicNoArgConstructorError( + name: String, + pluginClassName: String, + e: Exception): Throwable = { + new SparkException( + s"Failed to call public no-arg constructor for catalog '$name': $pluginClassName)", e) + } + + def cannotInstantiateAbstractCatalogPluginClassError( + name: String, + pluginClassName: String, + e: Exception): Throwable = { + new SparkException("Cannot instantiate abstract catalog plugin class for " + + s"catalog '$name': $pluginClassName", e.getCause) + } + + def failedToInstantiateConstructorForCatalogError( + name: String, + pluginClassName: String, + e: Exception): Throwable = { + new SparkException("Failed during instantiating constructor for catalog " + + s"'$name': $pluginClassName", e.getCause) + } + + def noSuchElementExceptionError(): Throwable = { + new NoSuchElementException + } + + def noSuchElementExceptionError(key: String): Throwable = { + new NoSuchElementException(key) + } + + def cannotMutateReadOnlySQLConfError(): Throwable = { + new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.") + } + + def cannotCloneOrCopyReadOnlySQLConfError(): Throwable = { + new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.") + } + + def cannotGetSQLConfInSchedulerEventLoopThreadError(): Throwable = { + new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.") + } + + def unsupportedOperationExceptionError(): Throwable = { + new UnsupportedOperationException + } + + def nullLiteralsCannotBeCastedError(name: String): Throwable = { + new UnsupportedOperationException(s"null literals can't be casted to $name") + } + + def notUserDefinedTypeError(name: String, userClass: String): Throwable = { + new SparkException(s"$name is not an UserDefinedType. Please make sure registering " + + s"an UserDefinedType for ${userClass}") + } + + def cannotLoadUserDefinedTypeError(name: String, userClass: String): Throwable = { + new SparkException(s"Can not load in UserDefinedType ${name} for user class ${userClass}.") + } + + def timeZoneIdNotSpecifiedForTimestampTypeError(): Throwable = { + new UnsupportedOperationException( + s"${TimestampType.catalogString} must supply timeZoneId parameter") + } + + def notPublicClassError(name: String): Throwable = { + new UnsupportedOperationException( + s"$name is not a public class. Only public classes are supported.") + } + + def primitiveTypesNotSupportedError(): Throwable = { + new UnsupportedOperationException("Primitive types are not supported.") + } + + def fieldIndexOnRowWithoutSchemaError(): Throwable = { + new UnsupportedOperationException("fieldIndex on a Row without schema is undefined.") + } + + def valueIsNullError(index: Int): Throwable = { + new NullPointerException(s"Value at index $index is null") + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala index 717ff93..8842753 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala @@ -17,9 +17,8 @@ package org.apache.spark.sql.execution -import java.util.NoSuchElementException - import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.errors.QueryExecutionErrors /** * An internal iterator interface which presents a more restrictive API than @@ -71,7 +70,7 @@ private final class RowIteratorToScala(val rowIter: RowIterator) extends Iterato _hasNext } override def next(): InternalRow = { - if (!hasNext) throw new NoSuchElementException + if (!hasNext) throw QueryExecutionErrors.noSuchElementExceptionError() hasNextWasCalled = false rowIter.getRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala index ef4b339..ef26683 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala @@ -21,6 +21,7 @@ import java.util.{Map => JMap} import org.apache.spark.TaskContext import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, ConfigReader} +import org.apache.spark.sql.errors.QueryExecutionErrors /** * A readonly SQLConf that will be created by tasks running at the executor side. It reads the @@ -37,27 +38,27 @@ class ReadOnlySQLConf(context: TaskContext) extends SQLConf { } override protected def setConfWithCheck(key: String, value: String): Unit = { - throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.") + throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError() } override def unsetConf(key: String): Unit = { - throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.") + throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError() } override def unsetConf(entry: ConfigEntry[_]): Unit = { - throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.") + throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError() } override def clear(): Unit = { - throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.") + throw QueryExecutionErrors.cannotMutateReadOnlySQLConfError() } override def clone(): SQLConf = { - throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.") + throw QueryExecutionErrors.cannotCloneOrCopyReadOnlySQLConfError() } override def copy(entries: (ConfigEntry[_], Any)*): SQLConf = { - throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.") + throw QueryExecutionErrors.cannotCloneOrCopyReadOnlySQLConfError() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index cfd9704..7926120 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -36,7 +36,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.{IGNORE_MISSING_FILES => SPARK_IGNORE_MISSING_FILES} import org.apache.spark.network.util.ByteUnit -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis.{HintErrorLogger, Resolver} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode @@ -44,6 +43,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator import org.apache.spark.sql.catalyst.plans.logical.HintErrorHandler import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils @@ -217,7 +217,7 @@ object SQLConf { if (conf != null) { conf } else if (Utils.isTesting) { - throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.") + throw QueryExecutionErrors.cannotGetSQLConfInSchedulerEventLoopThreadError() } else { confGetter.get()() } @@ -3999,7 +3999,7 @@ class SQLConf extends Serializable with Logging { // Try to use the default value Option(getConfigEntry(key)).map { e => e.stringConverter(e.readFrom(reader)) } }. - getOrElse(throw new NoSuchElementException(key)) + getOrElse(throw QueryExecutionErrors.noSuchElementExceptionError(key)) } /** @@ -4132,8 +4132,7 @@ class SQLConf extends Serializable with Logging { SQLConf.removedSQLConfigs.get(key).foreach { case RemovedConfig(configName, version, defaultValue, comment) => if (value != defaultValue) { - throw new AnalysisException( - s"The SQL config '$configName' was removed in the version $version. $comment") + throw QueryCompilationErrors.configRemovedInVersionError(configName, version, comment) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala index f7d48fd..afa091d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala @@ -21,6 +21,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.errors.QueryExecutionErrors /** * A non-concrete data type, reserved for internal uses. @@ -112,7 +113,8 @@ protected[sql] object AnyDataType extends AbstractDataType with Serializable { // Note that since AnyDataType matches any concrete types, defaultConcreteType should never // be invoked. - override private[sql] def defaultConcreteType: DataType = throw new UnsupportedOperationException + override private[sql] def defaultConcreteType: DataType = + throw QueryExecutionErrors.unsupportedOperationExceptionError() override private[sql] def simpleString: String = "any" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 22c9428..9401dc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -28,12 +28,12 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Stable -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.util.DataTypeJsonUtils.{DataTypeJsonDeserializer, DataTypeJsonSerializer} import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, STRICT} @@ -161,9 +161,7 @@ object DataType { fallbackParser(schema) } catch { case NonFatal(e2) => - throw new AnalysisException( - message = s"$errorMsg${e1.getMessage}\nFailed fallback parsing: ${e2.getMessage}", - cause = Some(e1.getCause)) + throw QueryCompilationErrors.failedFallbackParsingError(errorMsg, e1, e2) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala index 360a140..b7fc46c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala @@ -22,8 +22,8 @@ import java.util.Locale import scala.reflect.runtime.universe.typeTag import org.apache.spark.annotation.Stable -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf /** @@ -45,13 +45,12 @@ case class DecimalType(precision: Int, scale: Int) extends FractionalType { DecimalType.checkNegativeScale(scale) if (scale > precision) { - throw new AnalysisException( - s"Decimal scale ($scale) cannot be greater than precision ($precision).") + throw QueryCompilationErrors.decimalCannotGreaterThanPrecisionError(scale, precision) } if (precision > DecimalType.MAX_PRECISION) { - throw new AnalysisException( - s"${DecimalType.simpleString} can only support precision up to ${DecimalType.MAX_PRECISION}") + throw QueryCompilationErrors.decimalOnlySupportPrecisionUptoError( + DecimalType.simpleString, DecimalType.MAX_PRECISION) } // default constructor for Java @@ -158,9 +157,7 @@ object DecimalType extends AbstractDataType { private[sql] def checkNegativeScale(scale: Int): Unit = { if (scale < 0 && !SQLConf.get.allowNegativeScaleOfDecimalEnabled) { - throw new AnalysisException(s"Negative scale is not allowed: $scale. " + - s"You can use spark.sql.legacy.allowNegativeScaleOfDecimal=true " + - s"to enable legacy mode to allow it.") + throw QueryCompilationErrors.negativeScaleNotAllowedError(scale) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala index 21d773d..73a8a65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.types +import org.apache.spark.sql.errors.QueryExecutionErrors + object ObjectType extends AbstractDataType { override private[sql] def defaultConcreteType: DataType = - throw new UnsupportedOperationException( - s"null literals can't be casted to ${ObjectType.simpleString}") + throw QueryExecutionErrors.nullLiteralsCannotBeCastedError(ObjectType.simpleString) override private[sql] def acceptsType(other: DataType): Boolean = other match { case ObjectType(_) => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala index f13651f..eb28b06 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UDTRegistration.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.types import scala.collection.mutable -import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.util.Utils /** @@ -77,13 +77,10 @@ object UDTRegistration extends Serializable with Logging { if (classOf[UserDefinedType[_]].isAssignableFrom(udtClass)) { udtClass } else { - throw new SparkException( - s"${udtClass.getName} is not an UserDefinedType. Please make sure registering " + - s"an UserDefinedType for ${userClass}") + throw QueryExecutionErrors.notUserDefinedTypeError(udtClass.getName, userClass) } } else { - throw new SparkException( - s"Can not load in UserDefinedType ${udtClassName} for user class ${userClass}.") + throw QueryExecutionErrors.cannotLoadUserDefinedTypeError(udtClassName, userClass) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala index 48a5491..d09d83d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala @@ -24,6 +24,7 @@ import org.apache.arrow.vector.complex.MapVector import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, IntervalUnit, TimeUnit} import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -48,8 +49,7 @@ private[sql] object ArrowUtils { case DateType => new ArrowType.Date(DateUnit.DAY) case TimestampType => if (timeZoneId == null) { - throw new UnsupportedOperationException( - s"${TimestampType.catalogString} must supply timeZoneId parameter") + throw QueryExecutionErrors.timeZoneIdNotSpecifiedForTimestampTypeError() } else { new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) } @@ -57,7 +57,7 @@ private[sql] object ArrowUtils { case _: YearMonthIntervalType => new ArrowType.Interval(IntervalUnit.YEAR_MONTH) case _: DayTimeIntervalType => new ArrowType.Interval(IntervalUnit.DAY_TIME) case _ => - throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}") + throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString) } def fromArrowType(dt: ArrowType): DataType = dt match { @@ -78,7 +78,7 @@ private[sql] object ArrowUtils { case ArrowType.Null.INSTANCE => NullType case yi: ArrowType.Interval if yi.getUnit == IntervalUnit.YEAR_MONTH => YearMonthIntervalType() case di: ArrowType.Interval if di.getUnit == IntervalUnit.DAY_TIME => DayTimeIntervalType() - case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt") + case _ => throw QueryExecutionErrors.unsupportedDataTypeError(dt.toString) } /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala index cf30c71..0ca055f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/PartitioningUtils.scala @@ -17,12 +17,12 @@ package org.apache.spark.sql.util -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.DEFAULT_PARTITION_NAME import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{CharType, StructType, VarcharType} import org.apache.spark.unsafe.types.UTF8String @@ -42,7 +42,7 @@ private[sql] object PartitioningUtils { val rawSchema = CharVarcharUtils.getRawSchema(partCols) val normalizedPartSpec = partitionSpec.toSeq.map { case (key, value) => val normalizedFiled = rawSchema.find(f => resolver(f.name, key)).getOrElse { - throw new AnalysisException(s"$key is not a valid partition column in table $tblName.") + throw QueryCompilationErrors.invalidPartitionColumnKeyInTableError(key, tblName) } val normalizedVal = @@ -92,10 +92,8 @@ private[sql] object PartitioningUtils { partitionColumnNames: Seq[String]): Unit = { val defined = partitionColumnNames.sorted if (spec.keys.toSeq.sorted != defined) { - throw new AnalysisException( - s"Partition spec is invalid. The spec (${spec.keys.mkString(", ")}) must match " + - s"the partition spec (${partitionColumnNames.mkString(", ")}) defined in " + - s"table '$tableName'") + throw QueryCompilationErrors.invalidPartitionSpecError(spec.keys.mkString(", "), + partitionColumnNames, tableName) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 05f9cc5..da105af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -22,7 +22,7 @@ import java.util.Locale import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, NamedTransform, Transform} -import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} @@ -119,8 +119,7 @@ private[spark] object SchemaUtils { val duplicateColumns = names.groupBy(identity).collect { case (x, ys) if ys.length > 1 => s"`$x`" } - throw new AnalysisException( - s"Found duplicate column(s) $colType: ${duplicateColumns.toSeq.sorted.mkString(", ")}") + throw QueryCompilationErrors.foundDuplicateColumnError(colType, duplicateColumns.toSeq) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala index 0afacf0..887b0f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala @@ -78,7 +78,7 @@ object ArrowWriter { case (_: YearMonthIntervalType, vector: IntervalYearVector) => new IntervalYearWriter(vector) case (_: DayTimeIntervalType, vector: IntervalDayVector) => new IntervalDayWriter(vector) case (dt, _) => - throw QueryExecutionErrors.unsupportedDataTypeError(dt) + throw QueryExecutionErrors.unsupportedDataTypeError(dt.catalogString) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org