This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3e0808c33f1 [SPARK-46351][SQL] Require an error class in `AnalysisException` 3e0808c33f1 is described below commit 3e0808c33f185c13808ce2d547ce9ba0057d31a6 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Tue Dec 12 01:29:26 2023 +0300 [SPARK-46351][SQL] Require an error class in `AnalysisException` ### What changes were proposed in this pull request? In the PR, I propose to create `AnalysisException` only with an error class by making the constructor with `message` protected. So, in this way only sub-classes can create `AnalysisException` by passing a `message`, but others shall provide an error class. ### Why are the changes needed? To improve user experience with Spark SQL by unifying error exceptions: the final goal is all Spark exception should contain an error class. ### Does this PR introduce _any_ user-facing change? No since user's code shouldn't throw `AnalysisException` but it can if it depends on error message formats. ### How was this patch tested? By existing test test suites like: ``` $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` and the modified test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *HiveDDLSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44277 from MaxGekk/protected-AnalysisException. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../src/main/resources/error/error-classes.json | 253 +++++++++++++++++++++ .../apache/spark/sql/avro/AvroDataToCatalyst.scala | 19 +- .../org/apache/spark/sql/test/SQLHelper.scala | 4 +- .../connect/client/GrpcExceptionConverter.scala | 22 +- .../spark/sql/kafka010/KafkaSourceProvider.scala | 9 +- .../apache/spark/sql/kafka010/KafkaWriter.scala | 5 +- .../org/apache/spark/sql/AnalysisException.scala | 14 +- .../apache/spark/sql/catalyst/SQLConfHelper.scala | 4 +- .../catalyst/analysis/ColumnResolutionHelper.scala | 8 +- .../ResolveRowLevelCommandAssignments.scala | 4 +- .../catalyst/analysis/RewriteMergeIntoTable.scala | 12 +- .../catalyst/expressions/V2ExpressionUtils.scala | 12 +- .../spark/sql/catalyst/planning/patterns.scala | 5 +- .../sql/catalyst/plans/logical/v2Commands.scala | 8 +- .../org/apache/spark/sql/util/SchemaUtils.scala | 25 +- .../catalyst/catalog/ExternalCatalogSuite.scala | 4 +- .../spark/sql/RelationalGroupedDataset.scala | 4 +- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../spark/sql/execution/aggregate/AggUtils.scala | 5 +- .../execution/datasources/FileSourceStrategy.scala | 13 +- .../parquet/ParquetSchemaConverter.scala | 4 +- .../spark/sql/execution/datasources/rules.scala | 5 +- .../execution/datasources/v2/MergeRowsExec.scala | 4 +- .../datasources/v2/state/utils/SchemaUtil.scala | 6 +- .../RowLevelOperationRuntimeGroupFiltering.scala | 5 +- .../execution/streaming/WatermarkPropagator.scala | 6 +- .../execution/streaming/statefulOperators.scala | 6 +- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 5 +- .../sql/connector/DataSourceV2FunctionSuite.scala | 10 +- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 79 +++++-- .../connector/FileDataSourceV2FallBackSuite.scala | 28 ++- .../spark/sql/execution/QueryExecutionSuite.scala | 2 +- .../execution/datasources/orc/OrcFilterSuite.scala | 3 +- .../sql/execution/datasources/orc/OrcTest.scala | 3 +- .../datasources/parquet/ParquetFilterSuite.scala | 3 +- .../spark/sql/hive/HiveExternalCatalog.scala | 32 ++- .../org/apache/spark/sql/hive/HiveInspectors.scala | 15 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 17 +- .../spark/sql/hive/HiveSessionStateBuilder.scala | 7 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 8 +- .../sql/hive/execution/V1WritesHiveUtils.scala | 4 +- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 11 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 72 +++--- .../spark/sql/hive/execution/HiveQuerySuite.scala | 8 +- .../sql/hive/execution/Hive_2_1_DDLSuite.scala | 8 +- 45 files changed, 611 insertions(+), 173 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 62d10c0d34c..d52ffc011b7 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -6705,6 +6705,259 @@ "Failed to get block <blockId>, which is not a shuffle block" ] }, + "_LEGACY_ERROR_TEMP_3050" : { + "message" : [ + "Cannot modify the value of a static config: <k>" + ] + }, + "_LEGACY_ERROR_TEMP_3051" : { + "message" : [ + "When resolving <u>, fail to find subplan with plan_id=<planId> in <q>" + ] + }, + "_LEGACY_ERROR_TEMP_3052" : { + "message" : [ + "Unexpected resolved action: <other>" + ] + }, + "_LEGACY_ERROR_TEMP_3053" : { + "message" : [ + "Unexpected WHEN NOT MATCHED action: <other>" + ] + }, + "_LEGACY_ERROR_TEMP_3054" : { + "message" : [ + "<expr> is not currently supported" + ] + }, + "_LEGACY_ERROR_TEMP_3055" : { + "message" : [ + "ScalarFunction '<scalarFunc.name>' neither implement magic method nor override 'produceResult'" + ] + }, + "_LEGACY_ERROR_TEMP_3056" : { + "message" : [ + "Unexpected row-level read relations (allow multiple = <allowMultipleReads>): <other>" + ] + }, + "_LEGACY_ERROR_TEMP_3057" : { + "message" : [ + "Cannot retrieve row-level operation from <table>" + ] + }, + "_LEGACY_ERROR_TEMP_3058" : { + "message" : [ + "Found duplicate column(s) <checkType>: <duplicateColumns>" + ] + }, + "_LEGACY_ERROR_TEMP_3059" : { + "message" : [ + "The positions provided (<pos>) cannot be resolved in", + "<schema>" + ] + }, + "_LEGACY_ERROR_TEMP_3060" : { + "message" : [ + "Couldn't find column <i> in:", + "<schema>" + ] + }, + "_LEGACY_ERROR_TEMP_3061" : { + "message" : [ + "<e>", + "<schema>" + ] + }, + "_LEGACY_ERROR_TEMP_3062" : { + "message" : [ + "Expected <columnPath> to be a nested data type, but found <o>. Was looking for the index of <attr> in a nested field" + ] + }, + "_LEGACY_ERROR_TEMP_3063" : { + "message" : [ + "pivot is not supported on a streaming DataFrames/Datasets" + ] + }, + "_LEGACY_ERROR_TEMP_3064" : { + "message" : [ + "<msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3065" : { + "message" : [ + "<clazz>: <msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3066" : { + "message" : [ + "<msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3067" : { + "message" : [ + "Streaming aggregation doesn't support group aggregate pandas UDF" + ] + }, + "_LEGACY_ERROR_TEMP_3068" : { + "message" : [ + "Global aggregation with session window in streaming query is not supported." + ] + }, + "_LEGACY_ERROR_TEMP_3069" : { + "message" : [ + "<internalName> is a reserved column name that cannot be read in combination with <colName> column." + ] + }, + "_LEGACY_ERROR_TEMP_3070" : { + "message" : [ + "<internalName> is a reserved column name that cannot be read in combination with <colName> column." + ] + }, + "_LEGACY_ERROR_TEMP_3071" : { + "message" : [ + "<msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3072" : { + "message" : [ + "<msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3073" : { + "message" : [ + "Unexpected instruction: <other>" + ] + }, + "_LEGACY_ERROR_TEMP_3074" : { + "message" : [ + "field <fieldName> not found from given schema <schema>" + ] + }, + "_LEGACY_ERROR_TEMP_3075" : { + "message" : [ + "Couldn't find scan attribute for <tableAttr> in <scanAttrs>" + ] + }, + "_LEGACY_ERROR_TEMP_3076" : { + "message" : [ + "Redefining watermark is disallowed. You can set the config '<config>' to 'false' to restore the previous behavior. Note that multiple stateful operators will be disallowed." + ] + }, + "_LEGACY_ERROR_TEMP_3077" : { + "message" : [ + "More than one event time columns are available. Please ensure there is at most one event time column per stream. event time columns: <eventTimeCols>" + ] + }, + "_LEGACY_ERROR_TEMP_3078" : { + "message" : [ + "Can not match ParquetTable in the query." + ] + }, + "_LEGACY_ERROR_TEMP_3079" : { + "message" : [ + "Dynamic partition cannot be the parent of a static partition." + ] + }, + "_LEGACY_ERROR_TEMP_3080" : { + "message" : [ + "<msg>" + ] + }, + "_LEGACY_ERROR_TEMP_3081" : { + "message" : [ + "Save mode <mode> not allowed for Kafka. Allowed save modes are <append> and <errorIfExists> (default)." + ] + }, + "_LEGACY_ERROR_TEMP_3082" : { + "message" : [ + "Creating bucketed Hive serde table is not supported yet." + ] + }, + "_LEGACY_ERROR_TEMP_3083" : { + "message" : [ + "Unable to infer the schema. The schema specification is required to create the table <tableName>." + ] + }, + "_LEGACY_ERROR_TEMP_3084" : { + "message" : [ + "No handler for UDF/UDAF/UDTF '<clazz>': <e>" + ] + }, + "_LEGACY_ERROR_TEMP_3085" : { + "message" : [ + "from_avro() doesn't support the <name> mode. Acceptable modes are <permissiveMode> and <failFastMode>." + ] + }, + "_LEGACY_ERROR_TEMP_3086" : { + "message" : [ + "Cannot persist <tableName> into Hive metastore as table property keys may not start with 'spark.sql.': <invalidKeys>" + ] + }, + "_LEGACY_ERROR_TEMP_3087" : { + "message" : [ + "Cannot set or change the preserved property key: 'EXTERNAL'" + ] + }, + "_LEGACY_ERROR_TEMP_3088" : { + "message" : [ + "The metadata is corrupted. Unable to find the partition column names from the schema. schema: <schema>. Partition columns: <partColumnNames>" + ] + }, + "_LEGACY_ERROR_TEMP_3089" : { + "message" : [ + "Corrupted <typeName> in catalog: <numCols> parts expected, but part <index> is missing." + ] + }, + "_LEGACY_ERROR_TEMP_3090" : { + "message" : [ + "Raw list type in java is unsupported because Spark cannot infer the element type." + ] + }, + "_LEGACY_ERROR_TEMP_3091" : { + "message" : [ + "Raw map type in java is unsupported because Spark cannot infer key and value types." + ] + }, + "_LEGACY_ERROR_TEMP_3092" : { + "message" : [ + "Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because Spark cannot infer the data type for these type parameters." + ] + }, + "_LEGACY_ERROR_TEMP_3093" : { + "message" : [ + "Unsupported java type <c>" + ] + }, + "_LEGACY_ERROR_TEMP_3094" : { + "message" : [ + "<dt> is not supported." + ] + }, + "_LEGACY_ERROR_TEMP_3095" : { + "message" : [ + "<dt> cannot be converted to Hive TypeInfo" + ] + }, + "_LEGACY_ERROR_TEMP_3096" : { + "message" : [ + "Converted table has <resLen> columns,", + "but source Hive table has <relLen> columns.", + "Set <key> to false,", + "or recreate table <ident> to workaround." + ] + }, + "_LEGACY_ERROR_TEMP_3097" : { + "message" : [ + "Column in converted table has different data type with source Hive table's.", + "Set <key> to false,", + "or recreate table <ident> to workaround." + ] + }, + "_LEGACY_ERROR_TEMP_3100" : { + "message" : [ + "<message>" + ] + }, "_LEGACY_ERROR_USER_RAISED_EXCEPTION" : { "message" : [ "<errorMessage>" diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala index 06388409284..9f31a2db55a 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala @@ -72,16 +72,16 @@ private[sql] case class AvroDataToCatalyst( @transient private lazy val parseMode: ParseMode = { val mode = avroOptions.parseMode if (mode != PermissiveMode && mode != FailFastMode) { - throw new AnalysisException(unacceptableModeMessage(mode.name)) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3085", + messageParameters = Map( + "name" -> mode.name, + "permissiveMode" -> PermissiveMode.name, + "failFastMode" -> FailFastMode.name)) } mode } - private def unacceptableModeMessage(name: String): String = { - s"from_avro() doesn't support the $name mode. " + - s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}." - } - @transient private lazy val nullResultRow: Any = dataType match { case st: StructType => val resultRow = new SpecificInternalRow(st.map(_.dataType)) @@ -115,7 +115,12 @@ private[sql] case class AvroDataToCatalyst( s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " + "result, try setting the option 'mode' as 'PERMISSIVE'.", e) case _ => - throw new AnalysisException(unacceptableModeMessage(parseMode.name)) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3085", + messageParameters = Map( + "name" -> parseMode.name, + "permissiveMode" -> PermissiveMode.name, + "failFastMode" -> FailFastMode.name)) } } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala index 727e2a4f420..4a574a15f7a 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/test/SQLHelper.scala @@ -45,7 +45,9 @@ trait SQLHelper { if (spark.conf.isModifiable(k)) { spark.conf.set(k, v) } else { - throw new AnalysisException(s"Cannot modify the value of a static config: $k") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3050", + messageParameters = Map("k" -> k)) } } diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index c06498684fa..075526e7521 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -196,13 +196,21 @@ private[client] object GrpcExceptionConverter { errorClass = params.errorClass.orNull, messageParameters = params.messageParameters, queryContext = params.queryContext)), - errorConstructor(params => - new AnalysisException( - params.message, - cause = params.cause, - errorClass = params.errorClass, - messageParameters = params.messageParameters, - context = params.queryContext)), + errorConstructor(params => { + if (params.errorClass.isEmpty) { + new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3100", + messageParameters = Map("message" -> params.message), + cause = params.cause, + context = params.queryContext) + } else { + new AnalysisException( + errorClass = params.errorClass.get, + messageParameters = params.messageParameters, + cause = params.cause, + context = params.queryContext) + } + }), errorConstructor(params => new NamespaceAlreadyExistsException(params.errorClass.orNull, params.messageParameters)), errorConstructor(params => diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index f63f5e541e0..73446eddd25 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -172,9 +172,12 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister data: DataFrame): BaseRelation = { mode match { case SaveMode.Overwrite | SaveMode.Ignore => - throw new AnalysisException(s"Save mode $mode not allowed for Kafka. " + - s"Allowed save modes are ${SaveMode.Append} and " + - s"${SaveMode.ErrorIfExists} (default).") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3081", + messageParameters = Map( + "mode" -> mode.toString, + "append" -> SaveMode.Append.toString, + "errorIfExists" -> SaveMode.ErrorIfExists.toString)) case _ => // good } val caseInsensitiveParameters = CaseInsensitiveMap(parameters) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index d1c4386e486..d986394eb1c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -56,7 +56,10 @@ private[kafka010] object KafkaWriter extends Logging { headersExpression(schema) partitionExpression(schema) } catch { - case e: IllegalStateException => throw new AnalysisException(e.getMessage) + case e: IllegalStateException => + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3080", + messageParameters = Map("msg" -> e.getMessage)) } } diff --git a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala index a043df9b42c..7a428f6cc32 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin} * @since 1.3.0 */ @Stable -class AnalysisException protected[sql] ( +class AnalysisException protected( val message: String, val line: Option[Int] = None, val startPosition: Option[Int] = None, @@ -49,6 +49,18 @@ class AnalysisException protected[sql] ( messageParameters = messageParameters, cause = cause) + def this( + errorClass: String, + messageParameters: Map[String, String], + context: Array[QueryContext], + cause: Option[Throwable]) = + this( + SparkThrowableHelper.getMessage(errorClass, messageParameters), + errorClass = Some(errorClass), + messageParameters = messageParameters, + context = context, + cause = cause) + def this( errorClass: String, messageParameters: Map[String, String], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala index f4605b9218f..bd0455d76a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala @@ -47,7 +47,9 @@ trait SQLConfHelper { } keys.lazyZip(values).foreach { (k, v) => if (SQLConf.isStaticConfigKey(k)) { - throw new AnalysisException(s"Cannot modify the value of a static config: $k") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3050", + messageParameters = Map("k" -> k)) } conf.setConfString(k, v) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala index 70b44fbfa79..a90c6156503 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ColumnResolutionHelper.scala @@ -513,8 +513,12 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase { // df1 = spark.createDataFrame([Row(a = 1, b = 2, c = 3)]]) // df2 = spark.createDataFrame([Row(a = 1, b = 2)]]) // df1.select(df2.a) <- illegal reference df2.a - throw new AnalysisException(s"When resolving $u, " + - s"fail to find subplan with plan_id=$planId in\n$q") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3051", + messageParameters = Map( + "u" -> u.toString, + "planId" -> planId.toString, + "q" -> q.toString)) } }) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala index ee16ce262e3..3f3e707b054 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala @@ -116,7 +116,9 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] { case i @ InsertAction(_, assignments) => i.copy(assignments = AssignmentUtils.alignInsertAssignments(attrs, assignments)) case other => - throw new AnalysisException(s"Unexpected resolved action: $other") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3052", + messageParameters = Map("other" -> other.toString)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 4ba33f4743e..9e020cb55ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -95,7 +95,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper case InsertAction(cond, assignments) => Keep(cond.getOrElse(TrueLiteral), assignments.map(_.value)) case other => - throw new AnalysisException(s"Unexpected WHEN NOT MATCHED action: $other") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3053", + messageParameters = Map("other" -> other.toString)) } val outputs = notMatchedInstructions.flatMap(_.outputs) @@ -440,7 +442,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper Keep(cond.getOrElse(TrueLiteral), output) case other => - throw new AnalysisException(s"Unexpected action: $other") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3052", + messageParameters = Map("other" -> other.toString)) } } @@ -472,7 +476,9 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand with PredicateHelper Keep(cond.getOrElse(TrueLiteral), output) case other => - throw new AnalysisException(s"Unexpected action: $other") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3052", + messageParameters = Map("other" -> other.toString)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala index 60e457a776b..621e01eedea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala @@ -70,7 +70,8 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { query: LogicalPlan, funCatalogOpt: Option[FunctionCatalog] = None): Expression = toCatalystOpt(expr, query, funCatalogOpt) - .getOrElse(throw new AnalysisException(s"$expr is not currently supported")) + .getOrElse(throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3054", messageParameters = Map("expr" -> expr.toString))) def toCatalystOpt( expr: V2Expression, @@ -88,7 +89,9 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { case ref: FieldReference => Some(resolveRef[NamedExpression](ref, query)) case _ => - throw new AnalysisException(s"$expr is not currently supported") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3054", + messageParameters = Map("expr" -> expr.toString)) } } @@ -176,8 +179,9 @@ object V2ExpressionUtils extends SQLConfHelper with Logging { case Some(_) => ApplyFunctionExpression(scalarFunc, arguments) case _ => - throw new AnalysisException(s"ScalarFunction '${scalarFunc.name()}'" + - s" neither implement magic method nor override 'produceResult'") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3055", + messageParameters = Map("scalarFunc" -> scalarFunc.name())) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 53dd601ac39..e48b44a603a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -478,7 +478,10 @@ object GroupBasedRowLevelOperation { case other => throw new AnalysisException( - s"Unexpected row-level read relations (allow multiple = $allowMultipleReads): $other") + errorClass = "_LEGACY_ERROR_TEMP_3056", + messageParameters = Map( + "allowMultipleReads" -> allowMultipleReads.toString, + "other" -> other.toString)) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 0a18532b134..03ea8c8423c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -235,7 +235,9 @@ case class ReplaceData( case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) => operation case _ => - throw new AnalysisException(s"Cannot retrieve row-level operation from $table") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3057", + messageParameters = Map("table" -> table.toString)) } } @@ -313,7 +315,9 @@ case class WriteDelta( case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, _) => operation.asInstanceOf[SupportsDelta] case _ => - throw new AnalysisException(s"Cannot retrieve row-level operation from $table") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3057", + messageParameters = Map("table" -> table.toString)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala index 76d0a516a13..d061fde27c8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala @@ -192,7 +192,10 @@ private[spark] object SchemaUtils { case (x, ys) if ys.length > 1 => s"${x._2.mkString(".")}" } throw new AnalysisException( - s"Found duplicate column(s) $checkType: ${duplicateColumns.mkString(", ")}") + errorClass = "_LEGACY_ERROR_TEMP_3058", + messageParameters = Map( + "checkType" -> checkType, + "duplicateColumns" -> duplicateColumns.mkString(", "))) } } @@ -225,9 +228,11 @@ private[spark] object SchemaUtils { case o => if (column.length > 1) { throw new AnalysisException( - s"""Expected $columnPath to be a nested data type, but found $o. Was looking for the - |index of ${UnresolvedAttribute(column).name} in a nested field - """.stripMargin) + errorClass = "_LEGACY_ERROR_TEMP_3062", + messageParameters = Map( + "columnPath" -> columnPath, + "o" -> o.toString, + "attr" -> UnresolvedAttribute(column).name)) } Nil } @@ -239,9 +244,12 @@ private[spark] object SchemaUtils { } catch { case i: IndexOutOfBoundsException => throw new AnalysisException( - s"Couldn't find column ${i.getMessage} in:\n${schema.treeString}") + errorClass = "_LEGACY_ERROR_TEMP_3060", + messageParameters = Map("i" -> i.getMessage, "schema" -> schema.treeString)) case e: AnalysisException => - throw new AnalysisException(e.getMessage + s":\n${schema.treeString}") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3061", + messageParameters = Map("e" -> e.getMessage, "schema" -> schema.treeString)) } } @@ -261,7 +269,10 @@ private[spark] object SchemaUtils { (nameAndField._1 :+ nowField.name) -> nowField case _ => throw new AnalysisException( - s"The positions provided ($pos) cannot be resolved in\n${schema.treeString}.") + errorClass = "_LEGACY_ERROR_TEMP_3059", + messageParameters = Map( + "pos" -> pos.toString, + "schema" -> schema.treeString)) } } field._1 diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a8f73cebf31..1ee20a98cfd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -568,7 +568,9 @@ abstract class ExternalCatalogSuite extends SparkFunSuite { // HiveExternalCatalog may be the first one to notice and throw an exception, which will // then be caught and converted to a RuntimeException with a descriptive message. case ex: RuntimeException if ex.getMessage.contains("MetaException") => - throw new AnalysisException(ex.getMessage) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3066", + messageParameters = Map("msg" -> ex.getMessage)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 5ad96cdba21..eef1c9436df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -422,7 +422,9 @@ class RelationalGroupedDataset protected[sql]( */ def pivot(pivotColumn: Column): RelationalGroupedDataset = { if (df.isStreaming) { - throw new AnalysisException("pivot is not supported on a streaming DataFrames/Datasets") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3063", + messageParameters = Map.empty) } // This is to prevent unintended OOM errors when the number of distinct values is large val maxValues = df.sparkSession.sessionState.conf.dataFramePivotMaxValues diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 304ce0cd751..2d24f997d10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -421,7 +421,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { if (aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[PythonUDAF])) { throw new AnalysisException( - "Streaming aggregation doesn't support group aggregate pandas UDF") + errorClass = "_LEGACY_ERROR_TEMP_3067", + messageParameters = Map.empty) } val sessionWindowOption = namedGroupingExpressions.find { p => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala index 557f0e897ee..1972aeb3826 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala @@ -427,8 +427,9 @@ object AggUtils { } if (groupWithoutSessionExpression.isEmpty) { - throw new AnalysisException("Global aggregation with session window in streaming query" + - " is not supported.") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3068", + messageParameters = Map.empty) } val groupingWithoutSessionAttributes = groupWithoutSessionExpression.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 1b1eddecdb9..6e5c463ed72 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -259,9 +259,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { case FileSourceGeneratedMetadataStructField(field, internalName) => if (schemaColumns.contains(internalName)) { - throw new AnalysisException(internalName + - s"${internalName} is a reserved column name that cannot be read in combination " + - s"with ${FileFormat.METADATA_NAME}.${field.name} column.") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3069", + messageParameters = Map( + "internalName" -> internalName, + "colName" -> s"${FileFormat.METADATA_NAME}.${field.name}" + )) } // NOTE: Readers require the internal column to be nullable because it's not part of the @@ -276,7 +279,9 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { metadataColumnsByName.put(field.name, attr) constantMetadataColumns += attr - case field => throw new AnalysisException(s"Unrecognized file metadata field: $field") + case field => throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3070", + messageParameters = Map("field" -> field.toString)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index f60f7c11eef..59c99cb998c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -752,7 +752,9 @@ private[sql] object ParquetSchemaConverter { def checkConversionRequirement(f: => Boolean, message: String): Unit = { if (!f) { - throw new AnalysisException(message) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3071", + messageParameters = Map("msg" -> message)) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 12c183de19d..c58815b6978 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -364,7 +364,10 @@ case class PreprocessTableCreation(catalog: SessionCatalog) extends Rule[Logical } } - private def failAnalysis(msg: String) = throw new AnalysisException(msg) + private def failAnalysis(msg: String) = { + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3072", messageParameters = Map("msg" -> msg)) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala index 749acbaa7ad..8a25170fcee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala @@ -117,7 +117,9 @@ case class MergeRowsExec( SplitExec(createPredicate(cond), createProjection(output), createProjection(otherOutput)) case other => - throw new AnalysisException(s"Unexpected instruction: $other") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3073", + messageParameters = Map("other" -> other.toString)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala index 07f4a4b5bac..54c6b34db97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/utils/SchemaUtil.scala @@ -23,7 +23,11 @@ object SchemaUtil { def getSchemaAsDataType(schema: StructType, fieldName: String): DataType = { schema.getFieldIndex(fieldName) match { case Some(idx) => schema(idx).dataType - case _ => throw new AnalysisException(s"field $fieldName not found from given schema $schema") + case _ => throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3074", + messageParameters = Map( + "fieldName" -> fieldName, + "schema" -> schema.toString())) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala index 7c28f91ee1c..f0ae37fc8e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala @@ -134,7 +134,10 @@ class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPla .map(scanAttr => tableAttr -> scanAttr) .getOrElse { throw new AnalysisException( - s"Couldn't find scan attribute for $tableAttr in ${scanAttrs.mkString(",")}") + errorClass = "_LEGACY_ERROR_TEMP_3075", + messageParameters = Map( + "tableAttr" -> tableAttr.toString, + "scanAttrs" -> scanAttrs.mkString(","))) } } AttributeMap(attrMapping) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala index a4bee7e95b4..f0950063b16 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkPropagator.scala @@ -189,9 +189,9 @@ class PropagateWatermarkSimulator extends WatermarkPropagator with Logging { case node: EventTimeWatermarkExec => val inputWatermarks = getInputWatermarks(node, nodeToOutputWatermark) if (inputWatermarks.nonEmpty) { - throw new AnalysisException("Redefining watermark is disallowed. You can set the " + - s"config '${SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key}' to 'false' to restore " + - "the previous behavior. Note that multiple stateful operators will be disallowed.") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3076", + messageParameters = Map("config" -> SQLConf.STATEFUL_OPERATOR_ALLOW_MULTIPLE.key)) } nodeToOutputWatermark.put(node.id, Some(originWatermark)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 57db193a4c8..80f5b3532c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -392,9 +392,9 @@ object WatermarkSupport { // with allowing them. val eventTimeColsSet = eventTimeCols.map(_.exprId).toSet if (eventTimeColsSet.size > 1) { - throw new AnalysisException("More than one event time columns are available. Please " + - "ensure there is at most one event time column per stream. event time columns: " + - eventTimeCols.mkString("(", ",", ")")) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3077", + messageParameters = Map("eventTimeCols" -> eventTimeCols.mkString("(", ",", ")"))) } // With above check, even there are multiple columns in eventTimeCols, all columns must be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 966092b58d9..5ba4e39e8ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -635,7 +635,10 @@ abstract class JdbcDialect extends Serializable with Logging { * @return `AnalysisException` or its sub-class. */ def classifyException(message: String, e: Throwable): AnalysisException = { - new AnalysisException(message, cause = Some(e)) + new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3064", + messageParameters = Map("msg" -> message), + cause = Some(e)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala index b74d7318a92..f896997b57c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala @@ -411,11 +411,10 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), new JavaStrLen(new JavaStrLenNoImpl)) - // TODO assign a error-classes name checkError( exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()), - errorClass = null, - parameters = Map.empty, + errorClass = "_LEGACY_ERROR_TEMP_3055", + parameters = Map("scalarFunc" -> "strlen"), context = ExpectedContext( fragment = "testcat.ns.strlen('abc')", start = 7, @@ -446,11 +445,10 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { test("SPARK-35390: scalar function w/ mismatch type parameters from magic method") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddMismatchMagic)) - // TODO assign a error-classes name checkError( exception = intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 2L)").collect()), - errorClass = null, - parameters = Map.empty, + errorClass = "_LEGACY_ERROR_TEMP_3055", + parameters = Map("scalarFunc" -> "long_add_mismatch_magic"), context = ExpectedContext( fragment = "testcat.ns.add(1L, 2L)", start = 7, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 302a8e5d41d..47e79e45b73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1788,23 +1788,31 @@ class DataSourceV2SQLSuiteV1Filter } test("tableCreation: partition column case sensitive resolution") { - def checkFailure(statement: String): Unit = { + def checkFailure(statement: String, i: String): Unit = { withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { checkError( exception = intercept[AnalysisException] { sql(statement) }, - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3060", + parameters = Map( + "i" -> i, + "schema" -> + """root + | |-- a: integer (nullable = true) + | |-- b: string (nullable = true) + |""".stripMargin)) } } - checkFailure(s"CREATE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (A)") - checkFailure(s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (A)") + checkFailure(s"CREATE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (A)", "A") + checkFailure(s"CREATE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (A)", + "A") checkFailure( - s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (B)") + s"CREATE OR REPLACE TABLE tbl (a int, b string) USING $v2Source PARTITIONED BY (B)", "B") checkFailure( - s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (B)") + s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) USING $v2Source PARTITIONED BY (B)", + "B") } test("tableCreation: duplicate column names in the table definition") { @@ -1866,23 +1874,47 @@ class DataSourceV2SQLSuiteV1Filter checkError( exception = analysisException( s"CREATE TABLE tbl (a int, b string) USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3060", + parameters = Map( + "i" -> "c", + "schema" -> + """root + | |-- a: integer (nullable = true) + | |-- b: string (nullable = true) + |""".stripMargin)) checkError( exception = analysisException(s"CREATE TABLE testcat.tbl (a int, b string) " + s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3060", + parameters = Map( + "i" -> "c", + "schema" -> + """root + | |-- a: integer (nullable = true) + | |-- b: string (nullable = true) + |""".stripMargin)) checkError( exception = analysisException(s"CREATE OR REPLACE TABLE tbl (a int, b string) " + s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3060", + parameters = Map( + "i" -> "c", + "schema" -> + """root + | |-- a: integer (nullable = true) + | |-- b: string (nullable = true) + |""".stripMargin)) checkError( exception = analysisException(s"CREATE OR REPLACE TABLE testcat.tbl (a int, b string) " + s"USING $v2Source CLUSTERED BY (c) INTO 4 BUCKETS"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3060", + parameters = Map( + "i" -> "c", + "schema" -> + """root + | |-- a: integer (nullable = true) + | |-- b: string (nullable = true) + |""".stripMargin)) } test("tableCreation: bucket column name containing dot") { @@ -1906,26 +1938,27 @@ class DataSourceV2SQLSuiteV1Filter test("tableCreation: column repeated in partition columns") { Seq((true, ("a", "a")), (false, ("aA", "Aa"))).foreach { case (caseSensitive, (c0, c1)) => withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + val dupCol = c1.toLowerCase(Locale.ROOT) checkError( exception = analysisException( s"CREATE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3058", + parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol)) checkError( exception = analysisException( s"CREATE TABLE testcat.t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3058", + parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol)) checkError( exception = analysisException( s"CREATE OR REPLACE TABLE t ($c0 INT) USING $v2Source PARTITIONED BY ($c0, $c1)"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3058", + parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol)) checkError( exception = analysisException(s"CREATE OR REPLACE TABLE testcat.t ($c0 INT) " + s"USING $v2Source PARTITIONED BY ($c0, $c1)"), - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3058", + parameters = Map("checkType" -> "in the partitioning", "duplicateColumns" -> dupCol)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala index cfc8b2cc845..c6060dcdd51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/FileDataSourceV2FallBackSuite.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.connector import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkConf -import org.apache.spark.sql.{AnalysisException, QueryTest} +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability} import org.apache.spark.sql.connector.read.ScanBuilder @@ -51,7 +51,7 @@ class DummyReadOnlyFileTable extends Table with SupportsRead { override def schema(): StructType = StructType(Nil) override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { - throw new AnalysisException("Dummy file reader") + throw SparkException.internalError("Dummy file reader") } override def capabilities(): java.util.Set[TableCapability] = @@ -75,7 +75,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsWrite { override def schema(): StructType = StructType(Nil) override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = - throw new AnalysisException("Dummy file writer") + throw SparkException.internalError("Dummy file writer") override def capabilities(): java.util.Set[TableCapability] = java.util.EnumSet.of(TableCapability.BATCH_WRITE, TableCapability.ACCEPT_ANY_SCHEMA) @@ -99,10 +99,12 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { checkAnswer(spark.read.parquet(path), df) // Dummy File reader should fail as expected. - val exception = intercept[AnalysisException] { - spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect() - } - assert(exception.message.equals("Dummy file reader")) + checkError( + exception = intercept[SparkException] { + spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect() + }, + errorClass = "INTERNAL_ERROR", + parameters = Map("message" -> "Dummy file reader")) } } @@ -125,10 +127,12 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "foo,bar") { // Dummy File reader should fail as DISABLED_V2_FILE_DATA_SOURCE_READERS doesn't include it. - val exception = intercept[AnalysisException] { - spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect() - } - assert(exception.message.equals("Dummy file reader")) + checkError( + exception = intercept[SparkException] { + spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect() + }, + errorClass = "INTERNAL_ERROR", + parameters = Map("message" -> "Dummy file reader")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index e987adf6225..583d7fd7ee3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -160,7 +160,7 @@ class QueryExecutionSuite extends SharedSparkSession { // Throw an AnalysisException - this should be captured. spark.experimental.extraStrategies = Seq[SparkStrategy]( - (_: LogicalPlan) => throw new AnalysisException("exception")) + (_: LogicalPlan) => throw new AnalysisException("_LEGACY_ERROR_TEMP_3078", Map.empty)) assert(qe.toString.contains("org.apache.spark.sql.AnalysisException")) // Throw an Error - this should not be captured. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index ab4389eceec..03b1f937e7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -70,8 +70,7 @@ class OrcFilterSuite extends OrcTest with SharedSparkSession { s"${o.pushedFilters.mkString("pushedFilters(", ", ", ")")}") checker(maybeFilter.get) - case _ => - throw new AnalysisException("Can not match OrcTable in the query.") + case _ => assert(false, "Can not match OrcTable in the query.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala index 8a38075932b..0c696acdeda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcTest.scala @@ -133,8 +133,7 @@ trait OrcTest extends QueryTest with FileBasedDataSourceTest with BeforeAndAfter s"${o.pushedFilters.mkString("pushedFilters(", ", ", ")")}") } - case _ => - throw new AnalysisException("Can not match OrcTable in the query.") + case _ => assert(false, "Can not match OrcTable in the query.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d7ed5c4d354..da2705f7c72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -2345,7 +2345,8 @@ class ParquetV2FilterSuite extends ParquetFilterSuite { checker(stripSparkFilter(query), expected) case _ => - throw new AnalysisException("Can not match ParquetTable in the query.") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3078", messageParameters = Map.empty) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index c5adb8e27c4..84f75e3ef50 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -106,7 +106,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case o => o } throw new AnalysisException( - e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) + errorClass = "_LEGACY_ERROR_TEMP_3065", + messageParameters = Map( + "clazz" -> e.getClass.getCanonicalName, + "msg" -> Option(e.getMessage).getOrElse("")), + cause = Some(e)) } } @@ -132,14 +136,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def verifyTableProperties(table: CatalogTable): Unit = { val invalidKeys = table.properties.keys.filter(_.startsWith(SPARK_SQL_PREFIX)) if (invalidKeys.nonEmpty) { - throw new AnalysisException(s"Cannot persist ${table.qualifiedName} into Hive metastore " + - s"as table property keys may not start with '$SPARK_SQL_PREFIX': " + - invalidKeys.mkString("[", ", ", "]")) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3086", + messageParameters = Map( + "tableName" -> table.qualifiedName, + "invalidKeys" -> invalidKeys.mkString("[", ", ", "]"))) } // External users are not allowed to set/switch the table type. In Hive metastore, the table // type can be switched by changing the value of a case-sensitive table property `EXTERNAL`. if (table.properties.contains("EXTERNAL")) { - throw new AnalysisException("Cannot set or change the preserved property key: 'EXTERNAL'") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3087", messageParameters = Map.empty) } } @@ -807,9 +814,11 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): StructType = { val partitionFields = partColumnNames.map { partCol => schema.find(_.name == partCol).getOrElse { - throw new AnalysisException("The metadata is corrupted. Unable to find the " + - s"partition column names from the schema. schema: ${schema.catalogString}. " + - s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3088", + messageParameters = Map( + "schema" -> schema.catalogString, + "partColumnNames" -> partColumnNames.mkString("[", ", ", "]"))) } } StructType(schema.filterNot(partitionFields.contains) ++ partitionFields) @@ -1424,9 +1433,12 @@ object HiveExternalCatalog { } yield props.getOrElse( s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index", throw new AnalysisException( - s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." + errorClass = "_LEGACY_ERROR_TEMP_3089", + messageParameters = Map( + "typeName" -> typeName, + "numCols" -> numCols, + "index" -> index.toString)) ) - ) } private def getPartitionColumnsFromTableProperties(metadata: CatalogTable): Seq[String] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index ba87ad37130..279af3f240d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -239,19 +239,19 @@ private[hive] trait HiveInspectors { // raw java list type unsupported case c: Class[_] if isSubClassOf(c, classOf[java.util.List[_]]) => throw new AnalysisException( - "Raw list type in java is unsupported because Spark cannot infer the element type.") + errorClass = "_LEGACY_ERROR_TEMP_3090", messageParameters = Map.empty) // raw java map type unsupported case c: Class[_] if isSubClassOf(c, classOf[java.util.Map[_, _]]) => throw new AnalysisException( - "Raw map type in java is unsupported because Spark cannot infer key and value types.") + errorClass = "_LEGACY_ERROR_TEMP_3091", messageParameters = Map.empty) case _: WildcardType => throw new AnalysisException( - "Collection types with wildcards (e.g. List<?> or Map<?, ?>) are unsupported because " + - "Spark cannot infer the data type for these type parameters.") + errorClass = "_LEGACY_ERROR_TEMP_3092", messageParameters = Map.empty) - case c => throw new AnalysisException(s"Unsupported java type $c") + case c => throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3093", messageParameters = Map("c" -> c.toString)) } private def isSubClassOf(t: Type, parent: Class[_]): Boolean = t match { @@ -1125,7 +1125,8 @@ private[hive] trait HiveInspectors { private def decimalTypeInfo(decimalType: DecimalType): TypeInfo = decimalType match { case DecimalType.Fixed(precision, scale) => new DecimalTypeInfo(precision, scale) - case dt => throw new AnalysisException(s"${dt.catalogString} is not supported.") + case dt => throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3094", messageParameters = Map("dt" -> dt.catalogString)) } def toTypeInfo: TypeInfo = dt match { @@ -1154,7 +1155,7 @@ private[hive] trait HiveInspectors { case _: YearMonthIntervalType => intervalYearMonthTypeInfo case dt => throw new AnalysisException( - s"${dt.catalogString} cannot be converted to Hive TypeInfo") + errorClass = "_LEGACY_ERROR_TEMP_3095", messageParameters = Map("dt" -> dt.catalogString)) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c9446b37829..f1d99d359cd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -301,17 +301,20 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // it, but also respect the exprId in table relation output. if (result.output.length != relation.output.length) { throw new AnalysisException( - s"Converted table has ${result.output.length} columns, " + - s"but source Hive table has ${relation.output.length} columns. " + - s"Set ${HiveUtils.CONVERT_METASTORE_PARQUET.key} to false, " + - s"or recreate table ${relation.tableMeta.identifier} to workaround.") + errorClass = "_LEGACY_ERROR_TEMP_3096", + messageParameters = Map( + "resLen" -> result.output.length.toString, + "relLen" -> relation.output.length.toString, + "key" -> HiveUtils.CONVERT_METASTORE_PARQUET.key, + "ident" -> relation.tableMeta.identifier.toString)) } if (!result.output.zip(relation.output).forall { case (a1, a2) => a1.dataType == a2.dataType }) { throw new AnalysisException( - s"Column in converted table has different data type with source Hive table's. " + - s"Set ${HiveUtils.CONVERT_METASTORE_PARQUET.key} to false, " + - s"or recreate table ${relation.tableMeta.identifier} to workaround.") + errorClass = "_LEGACY_ERROR_TEMP_3097", + messageParameters = Map( + "key" -> HiveUtils.CONVERT_METASTORE_PARQUET.key, + "ident" -> relation.tableMeta.identifier.toString)) } val newOutput = result.output.zip(relation.output).map { case (a1, a2) => a1.withExprId(a2.exprId) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index 8a33645853c..32100d060b0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -202,8 +202,11 @@ object HiveUDFExpressionBuilder extends SparkUDFExpressionBuilder { case i: InvocationTargetException => i.getCause case o => o } - val errorMsg = s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e" - val analysisException = new AnalysisException(errorMsg) + val analysisException = new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3084", + messageParameters = Map( + "clazz" -> clazz.getCanonicalName, + "e" -> e.toString)) analysisException.setStackTrace(e.getStackTrace) throw analysisException } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 3da3d4a0eb5..e5de5941d4a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -47,7 +47,8 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { table } else { if (table.bucketSpec.isDefined) { - throw new AnalysisException("Creating bucketed Hive serde table is not supported yet.") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3082", messageParameters = Map.empty) } val defaultStorage = HiveSerDe.getDefaultStorage(conf) @@ -101,8 +102,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { val withSchema = if (query.isEmpty) { val inferred = HiveUtils.inferSchema(withStorage) if (inferred.schema.length <= 0) { - throw new AnalysisException("Unable to infer the schema. " + - s"The schema specification is required to create the table ${inferred.identifier}.") + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3083", + messageParameters = Map("tableName" -> inferred.identifier.toString)) } inferred } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala index e6b1019e717..bbfc8364071 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala @@ -86,7 +86,9 @@ trait V1WritesHiveUtils { // Report error if any static partition appears after a dynamic partition val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) { - throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) + throw new AnalysisException( + errorClass = "_LEGACY_ERROR_TEMP_3079", + messageParameters = Map.empty) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index af3d4555bc5..c580fd0dfa5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -356,9 +356,14 @@ class DataSourceWithHiveMetastoreCatalogSuite |CREATE TABLE non_partition_table (id bigint) |STORED AS PARQUET LOCATION '$baseDir' |""".stripMargin) - val e = intercept[AnalysisException]( - spark.table("non_partition_table")).getMessage - assert(e.contains("Converted table has 2 columns, but source Hive table has 1 columns.")) + checkError( + exception = intercept[AnalysisException](spark.table("non_partition_table")), + errorClass = "_LEGACY_ERROR_TEMP_3096", + parameters = Map( + "resLen" -> "2", + "relLen" -> "1", + "key" -> "spark.sql.hive.convertMetastoreParquet", + "ident" -> "`spark_catalog`.`default`.`non_partition_table`")) } } }) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index cf2098641ad..de79e96c412 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -418,15 +418,15 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("CREATE TABLE tab1 USING hive") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3083", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`tab1`") ) checkError( exception = intercept[AnalysisException] { sql(s"CREATE TABLE tab2 USING hive location '${tempDir.getCanonicalPath}'") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3083", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`tab2`") ) } } @@ -812,7 +812,7 @@ class HiveDDLSuite sql(s"CREATE TABLE $tabName (height INT, length INT) " + s"TBLPROPERTIES('EXTERNAL'='TRUE')") }, - errorClass = null, + errorClass = "_LEGACY_ERROR_TEMP_3087", parameters = Map.empty ) } @@ -829,7 +829,7 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql(s"ALTER TABLE $tabName SET TBLPROPERTIES ('EXTERNAL' = 'TRUE')") }, - errorClass = null, + errorClass = "_LEGACY_ERROR_TEMP_3087", parameters = Map.empty ) // The table type is not changed to external @@ -1395,11 +1395,13 @@ class HiveDDLSuite }, errorClass = caseSensitive match { case "false" => "UNSUPPORTED_FEATURE.DROP_DATABASE" - case _ => null + case _ => "_LEGACY_ERROR_TEMP_3065" }, parameters = caseSensitive match { case "false" => Map("database" -> "`default`") - case _ => Map.empty + case _ => Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> "MetaException(message:Can not drop default database)") } ) } @@ -1892,8 +1894,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql(s"ALTER TABLE tbl SET TBLPROPERTIES ('${forbiddenPrefix}foo' = 'loser')") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3086", + parameters = Map( + "tableName" -> "spark_catalog.default.tbl", + "invalidKeys" -> s"[${forbiddenPrefix}foo]") ) checkError( exception = intercept[AnalysisException] { @@ -1909,8 +1913,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql(s"CREATE TABLE tbl2 (a INT) TBLPROPERTIES ('${forbiddenPrefix}foo'='anything')") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3086", + parameters = Map( + "tableName" -> "spark_catalog.default.tbl2", + "invalidKeys" -> s"[${forbiddenPrefix}foo]") ) } } @@ -2409,8 +2415,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("INSERT INTO TABLE t SELECT 1") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "java.lang.IllegalArgumentException", + "msg" -> "java.net.URISyntaxException: Relative path in absolute URI: a:b") ) } } @@ -2454,16 +2462,20 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "java.lang.IllegalArgumentException", + "msg" -> "java.net.URISyntaxException: Relative path in absolute URI: a:b") ) checkError( exception = intercept[AnalysisException] { sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "java.lang.IllegalArgumentException", + "msg" -> "java.net.URISyntaxException: Relative path in absolute URI: a:b") ) } } @@ -2566,8 +2578,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("ALTER TABLE tab ADD COLUMNS (C2 string)") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> "Partition column name c2 conflicts with table columns.") ) // hive catalog will still complains that c1 is duplicate column name because hive @@ -2576,8 +2590,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("ALTER TABLE tab ADD COLUMNS (C1 string)") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> "Duplicate column name c1 in the table definition.") ) } } @@ -2603,8 +2619,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("CREATE TABLE t2 STORED AS PARQUET AS SELECT null as null_col") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> "java.lang.UnsupportedOperationException: Unknown field type: void") ) sql("CREATE TABLE t3 AS SELECT NULL AS null_col") @@ -2627,8 +2645,10 @@ class HiveDDLSuite exception = intercept[AnalysisException] { sql("CREATE TABLE t2 (v VOID) STORED AS PARQUET") }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> "java.lang.UnsupportedOperationException: Unknown field type: void") ) sql("CREATE TABLE t3 (v VOID) USING hive") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 1412b4c8610..6e94333df4e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -833,8 +833,10 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' |WITH serdeproperties('s1'='9')""".stripMargin) }, - errorClass = null, - parameters = Map.empty) + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> "at least one column must be specified for the table")) sql("DROP TABLE alter1") } @@ -1270,7 +1272,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) |SELECT key, value, key % 5 FROM src""".stripMargin) }, - errorClass = null, + errorClass = "_LEGACY_ERROR_TEMP_3079", parameters = Map.empty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala index f9dbae9b1aa..98801e0b027 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala @@ -107,8 +107,12 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton { "CREATE TABLE t1 (c1 string) USING parquet", StructType(Array(StructField("c2", IntegerType)))) }, - errorClass = null, - parameters = Map.empty + errorClass = "_LEGACY_ERROR_TEMP_3065", + parameters = Map( + "clazz" -> "org.apache.hadoop.hive.ql.metadata.HiveException", + "msg" -> ("Unable to alter table. " + + "The following columns have types incompatible with the existing columns " + + "in their respective positions :\ncol")) ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org