This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new efed39516c0 [SPARK-42309][SQL] Introduce `INCOMPATIBLE_DATA_TO_TABLE` and sub classes efed39516c0 is described below commit efed39516c0c4e9654aec447ce91676026368384 Author: itholic <haejoon....@databricks.com> AuthorDate: Thu Jul 13 17:21:29 2023 +0300 [SPARK-42309][SQL] Introduce `INCOMPATIBLE_DATA_TO_TABLE` and sub classes ### What changes were proposed in this pull request? This PR proposes to assign name to _LEGACY_ERROR_TEMP_1204, "INCOMPATIBLE_DATA_TO_TABLE" and its sub classes: - CANNOT_FIND_DATA - AMBIGUOUS_COLUMN_NAME - EXTRA_STRUCT_FIELDS - NULLABLE_COLUMN - NULLABLE_ARRAY_ELEMENTS - NULLABLE_MAP_VALUES - CANNOT_SAFELY_CAST - STRUCT_MISSING_FIELDS - UNEXPECTED_COLUMN_NAME ### Why are the changes needed? We should assign proper name to _LEGACY_ERROR_TEMP_* ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*` Closes #39937 from itholic/LEGACY_1204. Authored-by: itholic <haejoon....@databricks.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- common/utils/src/main/resources/error/README.md | 14 + .../src/main/resources/error/error-classes.json | 59 ++- docs/_data/menu-sql.yaml | 2 +- ...ions-incompatible-data-for-table-error-class.md | 64 +++ ...tions-incompatible-data-to-table-error-class.md | 64 +++ docs/sql-error-conditions.md | 8 + docs/sql-ref-ansi-compliance.md | 3 +- .../sql/catalyst/analysis/AssignmentUtils.scala | 5 +- .../catalyst/analysis/TableOutputResolver.scala | 97 +++-- .../spark/sql/catalyst/types/DataTypeUtils.scala | 59 +-- .../spark/sql/errors/QueryCompilationErrors.scala | 110 +++++- .../catalyst/analysis/V2WriteAnalysisSuite.scala | 267 ++++++++++--- .../types/DataTypeWriteCompatibilitySuite.scala | 429 ++++++++++++--------- .../apache/spark/sql/DataFrameWriterV2Suite.scala | 39 +- .../org/apache/spark/sql/SQLInsertTestSuite.scala | 5 +- .../command/AlignMergeAssignmentsSuite.scala | 78 +++- .../command/AlignUpdateAssignmentsSuite.scala | 54 ++- .../org/apache/spark/sql/sources/InsertSuite.scala | 98 +++-- .../sql/test/DataFrameReaderWriterSuite.scala | 47 ++- .../spark/sql/hive/client/HiveClientSuite.scala | 22 +- 20 files changed, 1100 insertions(+), 424 deletions(-) diff --git a/common/utils/src/main/resources/error/README.md b/common/utils/src/main/resources/error/README.md index 838991c2b6a..dfcb42d49e7 100644 --- a/common/utils/src/main/resources/error/README.md +++ b/common/utils/src/main/resources/error/README.md @@ -1294,6 +1294,20 @@ The following SQLSTATEs are collated from: |IM013 |IM |ODBC driver |013 |Trace file error |SQL Server |N |SQL Server | |IM014 |IM |ODBC driver |014 |Invalid name of File DSN |SQL Server |N |SQL Server | |IM015 |IM |ODBC driver |015 |Corrupt file data source |SQL Server |N |SQL Server | +|KD000 |KD |datasource specific errors |000 |datasource specific errors |Databricks |N |Databricks | +|KD001 |KD |datasource specific errors |001 |Cannot read file footer |Databricks |N |Databricks | +|KD002 |KD |datasource specific errors |002 |Unexpected version |Databricks |N |Databricks | +|KD003 |KD |datasource specific errors |003 |Incorrect access to data type |Databricks |N |Databricks | +|KD004 |KD |datasource specific errors |004 |Delta protocol version error |Databricks |N |Databricks | +|KD005 |KD |datasource specific errors |005 |Table must include at least one non partition column |Databricks |N |Databricks | +|KD006 |KD |datasource specific errors |006 |No commits found at log path |Databricks |N |Databricks | +|KD007 |KD |datasource specific errors |007 |Table signature changed |Databricks |N |Databricks | +|KD008 |KD |datasource specific errors |008 |Table signature not set |Databricks |N |Databricks | +|KD009 |KD |datasource specific errors |009 |Partitions do not match |Databricks |N |Databricks | +|KD00A |KD |datasource specific errors |00A |Unexpected partial scan |Databricks |N |Databricks | +|KD00B |KD |datasource specific errors |00B |Unrecognised file |Databricks |N |Databricks | +|KD00C |KD |datasource specific errors |00C |Versioning not contiguous |Databricks |N |Databricks | +|KD00D |KD |datasource specific errors |00D |Stats required |Databricks |N |Databricks | |P0000 |P0 |PL/pgSQL Error |000 |plpgsql_error |PostgreSQL |N |PostgreSQL Redshift | |P0001 |P0 |PL/pgSQL Error |001 |raise_exception |PostgreSQL |N |PostgreSQL Redshift | |P0002 |P0 |PL/pgSQL Error |002 |no_data_found |PostgreSQL |N |PostgreSQL Redshift | diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 2c4d2b533a6..6691e86b463 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -904,6 +904,59 @@ "Detected an incompatible DataSourceRegister. Please remove the incompatible library from classpath or upgrade it. Error: <message>" ] }, + "INCOMPATIBLE_DATA_FOR_TABLE" : { + "message" : [ + "Cannot write incompatible data for the table <tableName>:" + ], + "subClass" : { + "AMBIGUOUS_COLUMN_NAME" : { + "message" : [ + "Ambiguous column name in the input data <colName>." + ] + }, + "CANNOT_FIND_DATA" : { + "message" : [ + "Cannot find data for the output column <colName>." + ] + }, + "CANNOT_SAFELY_CAST" : { + "message" : [ + "Cannot safely cast <colName> <srcType> to <targetType>." + ] + }, + "EXTRA_STRUCT_FIELDS" : { + "message" : [ + "Cannot write extra fields <extraFields> to the struct <colName>." + ] + }, + "NULLABLE_ARRAY_ELEMENTS" : { + "message" : [ + "Cannot write nullable elements to array of non-nulls: <colName>." + ] + }, + "NULLABLE_COLUMN" : { + "message" : [ + "Cannot write nullable values to non-null column <colName>." + ] + }, + "NULLABLE_MAP_VALUES" : { + "message" : [ + "Cannot write nullable values to map of non-nulls: <colName>." + ] + }, + "STRUCT_MISSING_FIELDS" : { + "message" : [ + "Struct <colName> missing fields: <missingFields>." + ] + }, + "UNEXPECTED_COLUMN_NAME" : { + "message" : [ + "Struct <colName> <order>-th field name does not match (may be out of order): expected <expected>, found <found>." + ] + } + }, + "sqlState" : "KD000" + }, "INCOMPATIBLE_JOIN_TYPES" : { "message" : [ "The join types <joinType1> and <joinType2> are incompatible." @@ -4051,12 +4104,6 @@ "Cannot resolve column name \"<colName>\" among (<fieldNames>)." ] }, - "_LEGACY_ERROR_TEMP_1204" : { - "message" : [ - "Cannot write incompatible data to table '<tableName>':", - "- <errors>." - ] - }, "_LEGACY_ERROR_TEMP_1205" : { "message" : [ "Expected only partition pruning predicates: <nonPartitionPruningPredicates>." diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml index 2345df89e51..62ad6a3a585 100644 --- a/docs/_data/menu-sql.yaml +++ b/docs/_data/menu-sql.yaml @@ -111,7 +111,7 @@ url: sql-error-conditions-connect-error-class.html - text: DATATYPE_MISMATCH error class url: sql-error-conditions-datatype-mismatch-error-class.html - - text: INCOMPATIBLE_DATA_TO_TABLE error class + - text: INCOMPATIBLE_DATA_FOR_TABLE error class url: sql-error-conditions-incompatible-data-to-table-error-class.html - text: INCOMPLETE_TYPE_DEFINITION error class url: sql-error-conditions-incomplete-type-definition-error-class.html diff --git a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md new file mode 100644 index 00000000000..f70b69ba6c5 --- /dev/null +++ b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md @@ -0,0 +1,64 @@ +--- +layout: global +title: INCOMPATIBLE_DATA_FOR_TABLE error class +displayTitle: INCOMPATIBLE_DATA_FOR_TABLE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +SQLSTATE: KD000 + +Cannot write incompatible data for the table `<tableName>`: + +This error class has the following derived error classes: + +## AMBIGUOUS_COLUMN_NAME + +Ambiguous column name in the input data `<colName>`. + +## CANNOT_FIND_DATA + +Cannot find data for the output column `<colName>`. + +## CANNOT_SAFELY_CAST + +Cannot safely cast `<colName>` `<srcType>` to `<targetType>`. + +## EXTRA_STRUCT_FIELDS + +Cannot write extra fields `<extraFields>` to the struct `<colName>`. + +## NULLABLE_ARRAY_ELEMENTS + +Cannot write nullable elements to array of non-nulls: `<colName>`. + +## NULLABLE_COLUMN + +Cannot write nullable values to non-null column `<colName>`. + +## NULLABLE_MAP_VALUES + +Cannot write nullable values to map of non-nulls: `<colName>`. + +## STRUCT_MISSING_FIELDS + +Struct `<colName>` missing fields: `<missingFields>`. + +## UNEXPECTED_COLUMN_NAME + +Struct `<colName>` `<order>`-th field name does not match (may be out of order): expected `<expected>`, found `<found>`. + + diff --git a/docs/sql-error-conditions-incompatible-data-to-table-error-class.md b/docs/sql-error-conditions-incompatible-data-to-table-error-class.md new file mode 100644 index 00000000000..180a9cb9e49 --- /dev/null +++ b/docs/sql-error-conditions-incompatible-data-to-table-error-class.md @@ -0,0 +1,64 @@ +--- +layout: global +title: INCOMPATIBLE_DATA_FOR_TABLE error class +displayTitle: INCOMPATIBLE_DATA_FOR_TABLE error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +SQLSTATE: none assigned + +Cannot write incompatible data for table `<tableName>`: + +This error class has the following derived error classes: + +## AMBIGUOUS_COLUMN_NAME + +Ambiguous column name in the input data `<colName>`. + +## CANNOT_FIND_DATA + +Cannot find data for the output column `<colName>`. + +## CANNOT_SAFELY_CAST + +Cannot safely cast `<colName>` `<srcType>` to `<targetType>`. + +## EXTRA_STRUCT_FIELDS + +Cannot write extra fields `<extraFields>` to the struct `<colName>`. + +## NULLABLE_ARRAY_ELEMENTS + +Cannot write nullable elements to array of non-nulls: `<colName>`. + +## NULLABLE_COLUMN + +Cannot write nullable values to non-null column `<colName>`. + +## NULLABLE_MAP_VALUES + +Cannot write nullable values to map of non-nulls: `<colName>`. + +## STRUCT_MISSING_FIELDS + +Struct `<colName>` missing fields: `<missingFields>`. + +## UNEXPECTED_COLUMN_NAME + +Struct `<colName>` `<order>`-th field name does not match (may be out of order): expected `<expected>`, found `<found>`. + + diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md index 88bf68dd18d..21eda114d06 100644 --- a/docs/sql-error-conditions.md +++ b/docs/sql-error-conditions.md @@ -636,6 +636,14 @@ SQLSTATE: none assigned Detected an incompatible DataSourceRegister. Please remove the incompatible library from classpath or upgrade it. Error: `<message>` +### [INCOMPATIBLE_DATA_FOR_TABLE](sql-error-conditions-incompatible-data-for-table-error-class.html) + +SQLSTATE: KD000 + +Cannot write incompatible data for the table `<tableName>`: + +For more details see [INCOMPATIBLE_DATA_FOR_TABLE](sql-error-conditions-incompatible-data-for-table-error-class.html) + ### INCOMPATIBLE_JOIN_TYPES [SQLSTATE: 42613](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index f9c6f5ea6aa..66258f7a8cd 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -152,8 +152,7 @@ CREATE TABLE t (v INT); -- `spark.sql.storeAssignmentPolicy=ANSI` INSERT INTO t VALUES ('1'); -org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table '`default`.`t`': -- Cannot safely cast 'v': string to int; +org.apache.spark.sql.AnalysisException: [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST] Cannot write incompatible data for table `spark_catalog`.`default`.`t`: Cannot safely cast `v`: "STRING" to "INT". -- `spark.sql.storeAssignmentPolicy=LEGACY` (This is a legacy behaviour until Spark 2.x) INSERT INTO t VALUES ('1'); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index c9ee68a0dc7..ebc30ae4a6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -119,7 +119,8 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { val colPath = Seq(attr.name) val actualAttr = restoreActualType(attr) val value = matchingAssignments.head.value - TableOutputResolver.resolveUpdate(value, actualAttr, conf, err => errors += err, colPath) + TableOutputResolver.resolveUpdate( + "", value, actualAttr, conf, err => errors += err, colPath) } Assignment(attr, resolvedValue) } @@ -163,7 +164,7 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { TableOutputResolver.checkNullability(colExpr, col, conf, colPath) } else if (exactAssignments.nonEmpty) { val value = exactAssignments.head.value - TableOutputResolver.resolveUpdate(value, col, conf, addError, colPath) + TableOutputResolver.resolveUpdate("", value, col, conf, addError, colPath) } else { applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 9c94437dbc7..894cd0b3991 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getDefaultValueExprOrNullLit +import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -57,6 +58,7 @@ object TableOutputResolver { // the column's default value. We need to pass `fillDefaultValue` as true here, if the // `supportColDefaultValue` parameter is also true. reorderColumnsByName( + tableName, query.output, actualExpectedCols, conf, @@ -78,11 +80,11 @@ object TableOutputResolver { tableName, actualExpectedCols.map(_.name), query) } - resolveColumnsByPosition(queryOutputCols, actualExpectedCols, conf, errors += _) + resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, conf, errors += _) } if (errors.nonEmpty) { - throw QueryCompilationErrors.cannotWriteIncompatibleDataToTableError(tableName, errors.toSeq) + resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _) } if (resolved == query.output) { @@ -93,6 +95,7 @@ object TableOutputResolver { } def resolveUpdate( + tableName: String, value: Expression, col: Attribute, conf: SQLConf, @@ -102,29 +105,31 @@ object TableOutputResolver { (value.dataType, col.dataType) match { // no need to reorder inner fields or cast if types are already compatible case (valueType, colType) if DataType.equalsIgnoreCompatibleNullability(valueType, colType) => - val canWriteExpr = canWrite(valueType, colType, byName = true, conf, addError, colPath) + val canWriteExpr = canWrite( + tableName, valueType, colType, byName = true, conf, addError, colPath) if (canWriteExpr) checkNullability(value, col, conf, colPath) else value case (valueType: StructType, colType: StructType) => val resolvedValue = resolveStructType( - value, valueType, col, colType, + tableName, value, valueType, col, colType, byName = true, conf, addError, colPath) resolvedValue.getOrElse(value) case (valueType: ArrayType, colType: ArrayType) => val resolvedValue = resolveArrayType( - value, valueType, col, colType, + tableName, value, valueType, col, colType, byName = true, conf, addError, colPath) resolvedValue.getOrElse(value) case (valueType: MapType, colType: MapType) => val resolvedValue = resolveMapType( - value, valueType, col, colType, + tableName, value, valueType, col, colType, byName = true, conf, addError, colPath) resolvedValue.getOrElse(value) case _ => - checkUpdate(value, col, conf, addError, colPath) + checkUpdate(tableName, value, col, conf, addError, colPath) } } private def checkUpdate( + tableName: String, value: Expression, attr: Attribute, conf: SQLConf, @@ -139,7 +144,7 @@ object TableOutputResolver { } val canWriteValue = canWrite( - value.dataType, attrTypeWithoutCharVarchar, + tableName, value.dataType, attrTypeWithoutCharVarchar, byName = true, conf, addError, colPath) if (canWriteValue) { @@ -157,6 +162,7 @@ object TableOutputResolver { } private def canWrite( + tableName: String, valueType: DataType, expectedType: DataType, byName: Boolean, @@ -166,7 +172,7 @@ object TableOutputResolver { conf.storeAssignmentPolicy match { case StoreAssignmentPolicy.STRICT | StoreAssignmentPolicy.ANSI => DataTypeUtils.canWrite( - valueType, expectedType, byName, conf.resolver, colPath.quoted, + tableName, valueType, expectedType, byName, conf.resolver, colPath.quoted, conf.storeAssignmentPolicy, addError) case _ => true @@ -174,6 +180,7 @@ object TableOutputResolver { } private def reorderColumnsByName( + tableName: String, inputCols: Seq[NamedExpression], expectedCols: Seq[Attribute], conf: SQLConf, @@ -191,12 +198,15 @@ object TableOutputResolver { None } if (defaultExpr.isEmpty) { - addError(s"Cannot find data for output column '${newColPath.quoted}'") + throw QueryCompilationErrors.incompatibleDataToTableCannotFindDataError( + tableName, newColPath.quoted + ) } defaultExpr } else if (matched.length > 1) { - addError(s"Ambiguous column name in the input data: '${newColPath.quoted}'") - None + throw QueryCompilationErrors.incompatibleDataToTableAmbiguousColumnNameError( + tableName, newColPath.quoted + ) } else { matchedCols += matched.head.name val expectedName = expectedCol.name @@ -209,18 +219,19 @@ object TableOutputResolver { (matchedCol.dataType, expectedCol.dataType) match { case (matchedType: StructType, expectedType: StructType) => resolveStructType( - matchedCol, matchedType, expectedCol, expectedType, + tableName, matchedCol, matchedType, expectedCol, expectedType, byName = true, conf, addError, newColPath) case (matchedType: ArrayType, expectedType: ArrayType) => resolveArrayType( - matchedCol, matchedType, expectedCol, expectedType, + tableName, matchedCol, matchedType, expectedCol, expectedType, byName = true, conf, addError, newColPath) case (matchedType: MapType, expectedType: MapType) => resolveMapType( - matchedCol, matchedType, expectedCol, expectedType, + tableName, matchedCol, matchedType, expectedCol, expectedType, byName = true, conf, addError, newColPath) case _ => - checkField(expectedCol, matchedCol, byName = true, conf, addError, newColPath) + checkField( + tableName, expectedCol, matchedCol, byName = true, conf, addError, newColPath) } } } @@ -228,9 +239,10 @@ object TableOutputResolver { if (reordered.length == expectedCols.length) { if (matchedCols.size < inputCols.length) { val extraCols = inputCols.filterNot(col => matchedCols.contains(col.name)) - .map(col => s"'${col.name}'").mkString(", ") - addError(s"Cannot write extra fields to struct '${colPath.quoted}': $extraCols") - Nil + .map(col => s"${toSQLId(col.name)}").mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, colPath.quoted, extraCols + ) } else { reordered } @@ -240,6 +252,7 @@ object TableOutputResolver { } private def resolveColumnsByPosition( + tableName: String, inputCols: Seq[NamedExpression], expectedCols: Seq[Attribute], conf: SQLConf, @@ -248,16 +261,18 @@ object TableOutputResolver { if (inputCols.size > expectedCols.size) { val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size) - .map(col => s"'${col.name}'") + .map(col => toSQLId(col.name)) .mkString(", ") - addError(s"Cannot write extra fields to struct '${colPath.quoted}': $extraColsStr") - return Nil + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, colPath.quoted, extraColsStr + ) } else if (inputCols.size < expectedCols.size) { val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size) - .map(col => s"'${col.name}'") + .map(col => toSQLId(col.name)) .mkString(", ") - addError(s"Struct '${colPath.quoted}' missing fields: $missingColsStr") - return Nil + throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError( + tableName, colPath.quoted, missingColsStr + ) } inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) => @@ -265,18 +280,18 @@ object TableOutputResolver { (inputCol.dataType, expectedCol.dataType) match { case (inputType: StructType, expectedType: StructType) => resolveStructType( - inputCol, inputType, expectedCol, expectedType, + tableName, inputCol, inputType, expectedCol, expectedType, byName = false, conf, addError, newColPath) case (inputType: ArrayType, expectedType: ArrayType) => resolveArrayType( - inputCol, inputType, expectedCol, expectedType, + tableName, inputCol, inputType, expectedCol, expectedType, byName = false, conf, addError, newColPath) case (inputType: MapType, expectedType: MapType) => resolveMapType( - inputCol, inputType, expectedCol, expectedType, + tableName, inputCol, inputType, expectedCol, expectedType, byName = false, conf, addError, newColPath) case _ => - checkField(expectedCol, inputCol, byName = false, conf, addError, newColPath) + checkField(tableName, expectedCol, inputCol, byName = false, conf, addError, newColPath) } } } @@ -301,6 +316,7 @@ object TableOutputResolver { } private def resolveStructType( + tableName: String, input: Expression, inputType: StructType, expected: Attribute, @@ -314,9 +330,10 @@ object TableOutputResolver { Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)() } val resolved = if (byName) { - reorderColumnsByName(fields, toAttributes(expectedType), conf, addError, colPath) + reorderColumnsByName(tableName, fields, toAttributes(expectedType), conf, addError, colPath) } else { - resolveColumnsByPosition(fields, toAttributes(expectedType), conf, addError, colPath) + resolveColumnsByPosition( + tableName, fields, toAttributes(expectedType), conf, addError, colPath) } if (resolved.length == expectedType.length) { val struct = CreateStruct(resolved) @@ -332,6 +349,7 @@ object TableOutputResolver { } private def resolveArrayType( + tableName: String, input: Expression, inputType: ArrayType, expected: Attribute, @@ -345,9 +363,9 @@ object TableOutputResolver { val fakeAttr = AttributeReference("element", expectedType.elementType, expectedType.containsNull)() val res = if (byName) { - reorderColumnsByName(Seq(param), Seq(fakeAttr), conf, addError, colPath) + reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath) } else { - resolveColumnsByPosition(Seq(param), Seq(fakeAttr), conf, addError, colPath) + resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, addError, colPath) } if (res.length == 1) { val func = LambdaFunction(res.head, Seq(param)) @@ -358,6 +376,7 @@ object TableOutputResolver { } private def resolveMapType( + tableName: String, input: Expression, inputType: MapType, expected: Attribute, @@ -371,9 +390,9 @@ object TableOutputResolver { val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = false) val fakeKeyAttr = AttributeReference("key", expectedType.keyType, nullable = false)() val resKey = if (byName) { - reorderColumnsByName(Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) + reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) } else { - resolveColumnsByPosition(Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) + resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, addError, colPath) } val valueParam = @@ -381,9 +400,10 @@ object TableOutputResolver { val fakeValueAttr = AttributeReference("value", expectedType.valueType, expectedType.valueContainsNull)() val resValue = if (byName) { - reorderColumnsByName(Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) + reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) } else { - resolveColumnsByPosition(Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) + resolveColumnsByPosition( + tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, colPath) } if (resKey.length == 1 && resValue.length == 1) { @@ -430,6 +450,7 @@ object TableOutputResolver { } private def checkField( + tableName: String, tableAttr: Attribute, queryExpr: NamedExpression, byName: Boolean, @@ -465,7 +486,7 @@ object TableOutputResolver { } val canWriteExpr = canWrite( - queryExpr.dataType, attrTypeWithoutCharVarchar, + tableName, queryExpr.dataType, attrTypeWithoutCharVarchar, byName, conf, addError, colPath) if (canWriteExpr) outputField else None diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala index d0df8c3270e..feff781a403 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.types import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, Literal} +import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, STRICT} import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, DecimalType, MapType, NullType, StructField, StructType} @@ -72,6 +74,7 @@ object DataTypeUtils { * @return true if data written with the write type can be read using the read type */ def canWrite( + tableName: String, write: DataType, read: DataType, byName: Boolean, @@ -83,12 +86,13 @@ object DataTypeUtils { case (wArr: ArrayType, rArr: ArrayType) => // run compatibility check first to produce all error messages val typesCompatible = canWrite( - wArr.elementType, rArr.elementType, byName, resolver, context + ".element", + tableName, wArr.elementType, rArr.elementType, byName, resolver, context + ".element", storeAssignmentPolicy, addError) if (wArr.containsNull && !rArr.containsNull) { - addError(s"Cannot write nullable elements to array of non-nulls: '$context'") - false + throw QueryCompilationErrors.incompatibleDataToTableNullableArrayElementsError( + tableName, context + ) } else { typesCompatible } @@ -99,15 +103,16 @@ object DataTypeUtils { // run compatibility check first to produce all error messages val keyCompatible = canWrite( - wMap.keyType, rMap.keyType, byName, resolver, context + ".key", + tableName, wMap.keyType, rMap.keyType, byName, resolver, context + ".key", storeAssignmentPolicy, addError) val valueCompatible = canWrite( - wMap.valueType, rMap.valueType, byName, resolver, context + ".value", + tableName, wMap.valueType, rMap.valueType, byName, resolver, context + ".value", storeAssignmentPolicy, addError) if (wMap.valueContainsNull && !rMap.valueContainsNull) { - addError(s"Cannot write nullable values to map of non-nulls: '$context'") - false + throw QueryCompilationErrors.incompatibleDataToTableNullableMapValuesError( + tableName, context + ) } else { keyCompatible && valueCompatible } @@ -119,16 +124,15 @@ object DataTypeUtils { val nameMatch = resolver(wField.name, rField.name) || isSparkGeneratedName(wField.name) val fieldContext = s"$context.${rField.name}" val typesCompatible = canWrite( - wField.dataType, rField.dataType, byName, resolver, fieldContext, + tableName, wField.dataType, rField.dataType, byName, resolver, fieldContext, storeAssignmentPolicy, addError) if (byName && !nameMatch) { - addError(s"Struct '$context' $i-th field name does not match " + - s"(may be out of order): expected '${rField.name}', found '${wField.name}'") - fieldCompatible = false + throw QueryCompilationErrors.incompatibleDataToTableUnexpectedColumnNameError( + tableName, context, i, rField.name, wField.name) } else if (!rField.nullable && wField.nullable) { - addError(s"Cannot write nullable values to non-null field: '$fieldContext'") - fieldCompatible = false + throw QueryCompilationErrors.incompatibleDataToTableNullableColumnError( + tableName, fieldContext) } else if (!typesCompatible) { // errors are added in the recursive call to canWrite above fieldCompatible = false @@ -137,25 +141,27 @@ object DataTypeUtils { if (readFields.size > writeFields.size) { val missingFieldsStr = readFields.takeRight(readFields.size - writeFields.size) - .map(f => s"'${f.name}'").mkString(", ") + .map(f => s"${toSQLId(f.name)}").mkString(", ") if (missingFieldsStr.nonEmpty) { - addError(s"Struct '$context' missing fields: $missingFieldsStr") - fieldCompatible = false + throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError( + tableName, context, missingFieldsStr) } } else if (writeFields.size > readFields.size) { val extraFieldsStr = writeFields.takeRight(writeFields.size - readFields.size) - .map(f => s"'${f.name}'").mkString(", ") - addError(s"Cannot write extra fields to struct '$context': $extraFieldsStr") - fieldCompatible = false + .map(f => s"${toSQLId(f.name)}").mkString(", ") + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, context, extraFieldsStr + ) } fieldCompatible case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == STRICT => if (!Cast.canUpCast(w, r)) { - addError(s"Cannot safely cast '$context': ${w.catalogString} to ${r.catalogString}") - false + throw QueryCompilationErrors.incompatibleDataToTableCannotSafelyCastError( + tableName, context, w.catalogString, r.catalogString + ) } else { true } @@ -164,8 +170,9 @@ object DataTypeUtils { case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI => if (!Cast.canANSIStoreAssign(w, r)) { - addError(s"Cannot safely cast '$context': ${w.catalogString} to ${r.catalogString}") - false + throw QueryCompilationErrors.incompatibleDataToTableCannotSafelyCastError( + tableName, context, w.catalogString, r.catalogString + ) } else { true } @@ -174,9 +181,9 @@ object DataTypeUtils { true case (w, r) => - addError(s"Cannot write '$context': " + - s"${w.catalogString} is incompatible with ${r.catalogString}") - false + throw QueryCompilationErrors.incompatibleDataToTableCannotSafelyCastError( + tableName, context, w.catalogString, r.catalogString + ) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index aab5ada6e4d..955046e74e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2074,12 +2074,114 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) } - def cannotWriteIncompatibleDataToTableError(tableName: String, errors: Seq[String]): Throwable = { + def incompatibleDataToTableCannotFindDataError( + tableName: String, colName: String): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", messageParameters = Map( - "tableName" -> tableName, - "errors" -> errors.mkString("\n- "))) + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName) + ) + ) + } + + def incompatibleDataToTableAmbiguousColumnNameError( + tableName: String, colName: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.AMBIGUOUS_COLUMN_NAME", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName) + ) + ) + } + + def incompatibleDataToTableExtraStructFieldsError( + tableName: String, colName: String, extraFields: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName), + "extraFields" -> extraFields + ) + ) + } + + def incompatibleDataToTableNullableColumnError( + tableName: String, colName: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_COLUMN", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName) + ) + ) + } + + def incompatibleDataToTableNullableArrayElementsError( + tableName: String, colName: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_ARRAY_ELEMENTS", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName) + ) + ) + } + + def incompatibleDataToTableNullableMapValuesError( + tableName: String, colName: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_MAP_VALUES", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName) + ) + ) + } + + def incompatibleDataToTableCannotSafelyCastError( + tableName: String, colName: String, srcType: String, targetType: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName), + "srcType" -> toSQLType(srcType), + "targetType" -> toSQLType(targetType) + ) + ) + } + + def incompatibleDataToTableStructMissingFieldsError( + tableName: String, colName: String, missingFields: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName), + "missingFields" -> missingFields + ) + ) + } + + def incompatibleDataToTableUnexpectedColumnNameError( + tableName: String, + colName: String, + order: Int, + expected: String, + found: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.UNEXPECTED_COLUMN_NAME", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "colName" -> toSQLId(colName), + "order" -> order.toString, + "expected" -> toSQLId(expected), + "found" -> toSQLId(found) + ) + ) } def invalidRowLevelOperationAssignments( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index 0a97b130c80..d91a080d8fe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import java.util.Locale +import org.apache.spark.QueryContext import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, CreateNamedStruct, GetStructField, If, IsNull, LessThanOrEqual, Literal} @@ -153,6 +154,23 @@ abstract class V2ANSIWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase { super.assertAnalysisError(inputPlan, expectedErrors, caseSensitive) } } + + override def assertAnalysisErrorClass( + inputPlan: LogicalPlan, + expectedErrorClass: String, + expectedMessageParameters: Map[String, String], + queryContext: Array[QueryContext] = Array.empty, + caseSensitive: Boolean = true): Unit = { + withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.ANSI.toString) { + super.assertAnalysisErrorClass( + inputPlan, + expectedErrorClass, + expectedMessageParameters, + queryContext, + caseSensitive + ) + } + } } abstract class V2StrictWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase { @@ -174,13 +192,36 @@ abstract class V2StrictWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase { } } + override def assertAnalysisErrorClass( + inputPlan: LogicalPlan, + expectedErrorClass: String, + expectedMessageParameters: Map[String, String], + queryContext: Array[QueryContext] = Array.empty, + caseSensitive: Boolean = true): Unit = { + withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.STRICT.toString) { + super.assertAnalysisErrorClass( + inputPlan, + expectedErrorClass, + expectedMessageParameters, + queryContext, + caseSensitive + ) + } + } + test("byName: fail canWrite check") { val parsedPlan = byName(table, widerTable) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "double to float")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`x`", + "srcType" -> "\"DOUBLE\"", + "targetType" -> "\"FLOAT\"") + ) } test("byName: multiple field errors are reported") { @@ -195,10 +236,15 @@ abstract class V2StrictWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase { val parsedPlan = byName(xRequiredTable, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "double to float", - "Cannot find data for output column", "'y'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`x`", + "srcType" -> "\"DOUBLE\"", + "targetType" -> "\"FLOAT\"") + ) } test("byPosition: fail canWrite check") { @@ -209,9 +255,15 @@ abstract class V2StrictWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase { val parsedPlan = byPosition(table, widerTable) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "double to float")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`x`", + "srcType" -> "\"DOUBLE\"", + "targetType" -> "\"FLOAT\"") + ) } test("byPosition: multiple field errors are reported") { @@ -226,10 +278,15 @@ abstract class V2StrictWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase { val parsedPlan = byPosition(xRequiredTable, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "double to float", - "Cannot safely cast", "'y'", "double to float")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`x`", + "srcType" -> "\"DOUBLE\"", + "targetType" -> "\"FLOAT\"") + ) } } @@ -335,9 +392,11 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot find data for output column", "'x'", "'y'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + ) } test("byName: case sensitive column resolution") { @@ -348,10 +407,11 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot find data for output column", "'x'"), - caseSensitive = true) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + ) } test("byName: case insensitive column resolution") { @@ -410,9 +470,11 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byName(requiredTable, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot find data for output column", "'x'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + ) } test("byName: missing optional columns cause failure and are identified by name") { @@ -423,9 +485,11 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot find data for output column", "'x'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`x`") + ) } test("byName: insert safe cast") { @@ -468,9 +532,15 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val query = TestRelation(Seq($"b".struct($"y".int, $"x".int, $"z".int), $"a".int)) val writePlan = byName(table, query) - assertAnalysisError(writePlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot write extra fields to struct 'b': 'z'")) + assertAnalysisErrorClass( + writePlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`b`", + "extraFields" -> "`z`" + ) + ) } test("byPosition: basic behavior") { @@ -641,10 +711,11 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { withClue("byName") { val parsedPlan = byName(tableWithStructCol, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot find data for output column 'col.a'", - "Cannot find data for output column 'col.b'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" -> "`col`.`a`") + ) } withClue("byPosition") { @@ -692,9 +763,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot write extra fields to struct 'b.n2': 'dn3'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`b`.`n2`", + "extraFields" -> "`dn3`") + ) } test("SPARK-42997: extra fields in struct inside array (byName)") { @@ -716,9 +792,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot write extra fields to struct 'arr.element': 'z'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`arr`.`element`", + "extraFields" -> "`z`") + ) } test("SPARK-42997: extra fields in struct inside map key (byName)") { @@ -744,9 +825,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot write extra fields to struct 'm.key': 'z'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`m`.`key`", + "extraFields" -> "`z`") + ) } test("SPARK-42997: extra fields in struct inside map value (byName)") { @@ -772,9 +858,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = if (byNameResolution) byName(table, query) else byPosition(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - "Cannot write extra fields to struct 'm.value': 'z'")) + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`m`.`value`", + "extraFields" -> "`z`") + ) } test("SPARK-42997: missing fields in nested struct (byName)") { @@ -800,9 +891,24 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { } assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - expectedErrMsg)) + if (byNameResolution) { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`b`.`n2`.`dn3`") + ) + } else { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`b`.`n2`", + "missingFields" -> "`dn3`") + ) + } } test("SPARK-42997: missing fields in struct inside array (byName)") { @@ -828,9 +934,24 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { } assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - expectedErrMsg)) + if (byNameResolution) { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`arr`.`element`.`y`") + ) + } else { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`arr`.`element`", + "missingFields" -> "`y`") + ) + } } test("SPARK-42997: missing fields in struct inside map key (byName)") { @@ -860,9 +981,24 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { } assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - expectedErrMsg)) + if (byNameResolution) { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`m`.`key`.`y`") + ) + } else { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`m`.`key`", + "missingFields" -> "`y`") + ) + } } test("SPARK-42997: missing fields in struct inside map value (byName)") { @@ -892,9 +1028,24 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { } assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write incompatible data to table", "'table-name'", - expectedErrMsg)) + if (byNameResolution) { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`m`.`value`.`y`") + ) + } else { + assertAnalysisErrorClass( + parsedPlan, + expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "colName" -> "`m`.`value`", + "missingFields" -> "`y`") + ) + } } test("SPARK-42855: NOT NULL checks for nested structs, arrays, maps (byName)") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index 39aff1df791..7aaa69a0a5d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.types import scala.collection.mutable import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.types.DataTypeUtils +import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy @@ -33,56 +35,94 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa override def canCast: (DataType, DataType) => Boolean = Cast.canUpCast test("Check struct types: unsafe casts are not allowed") { - assertNumErrors(widerPoint2, point2, "t", - "Should fail because types require unsafe casts", 2) { errs => - - assert(errs(0).contains("'t.x'"), "Should include the nested field name context") - assert(errs(0).contains("Cannot safely cast")) - - assert(errs(1).contains("'t.y'"), "Should include the nested field name context") - assert(errs(1).contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", widerPoint2, point2, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`t`.`x`", + "srcType" -> "\"DOUBLE\"", + "targetType" -> "\"FLOAT\"") + ) } test("Check array types: unsafe casts are not allowed") { val arrayOfLong = ArrayType(LongType) val arrayOfInt = ArrayType(IntegerType) - assertSingleError(arrayOfLong, arrayOfInt, "arr", - "Should not allow array of longs to array of ints") { err => - assert(err.contains("'arr.element'"), - "Should identify problem with named array's element type") - assert(err.contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", arrayOfLong, arrayOfInt, true, + analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`arr`.`element`", + "srcType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"") + ) } test("Check map value types: casting Long to Integer is not allowed") { val mapOfLong = MapType(StringType, LongType) val mapOfInt = MapType(StringType, IntegerType) - assertSingleError(mapOfLong, mapOfInt, "m", - "Should not allow map of longs to map of ints") { err => - assert(err.contains("'m.value'"), "Should identify problem with named map's value type") - assert(err.contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", mapOfLong, mapOfInt, true, + analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`m`.`value`", + "srcType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"") + ) } test("Check map key types: unsafe casts are not allowed") { val mapKeyLong = MapType(LongType, StringType) val mapKeyInt = MapType(IntegerType, StringType) - assertSingleError(mapKeyLong, mapKeyInt, "m", - "Should not allow map of long keys to map of int keys") { err => - assert(err.contains("'m.key'"), "Should identify problem with named map's key type") - assert(err.contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", mapKeyLong, mapKeyInt, true, + analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`m`.`key`", + "srcType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"") + ) } test("Check NullType is incompatible with all other types") { allNonNullTypes.foreach { t => - assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with ${t.catalogString}")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", NullType, t, true, + analysis.caseSensitiveResolution, "nulls", storeAssignmentPolicy, + errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`nulls`", + "srcType" -> "\"VOID\"", + "targetType" -> toSQLType(t.catalogString)) + ) } } } @@ -97,11 +137,19 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase val mapOfString = MapType(StringType, StringType) val mapOfInt = MapType(StringType, IntegerType) - assertSingleError(mapOfString, mapOfInt, "m", - "Should not allow map of strings to map of ints") { err => - assert(err.contains("'m.value'"), "Should identify problem with named map's value type") - assert(err.contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", mapOfString, mapOfInt, true, + analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`m`.`value`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"INT\"") + ) } private val stringPoint2 = StructType(Seq( @@ -109,50 +157,87 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase StructField("y", StringType, nullable = false))) test("Check struct types: unsafe casts are not allowed") { - assertNumErrors(stringPoint2, point2, "t", - "Should fail because types require unsafe casts", 2) { errs => - - assert(errs(0).contains("'t.x'"), "Should include the nested field name context") - assert(errs(0).contains("Cannot safely cast")) - - assert(errs(1).contains("'t.y'"), "Should include the nested field name context") - assert(errs(1).contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", stringPoint2, point2, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`t`.`x`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"FLOAT\"") + ) } test("Check array types: unsafe casts are not allowed") { val arrayOfString = ArrayType(StringType) val arrayOfInt = ArrayType(IntegerType) - assertSingleError(arrayOfString, arrayOfInt, "arr", - "Should not allow array of strings to array of ints") { err => - assert(err.contains("'arr.element'"), - "Should identify problem with named array's element type") - assert(err.contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", arrayOfString, arrayOfInt, true, + analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`arr`.`element`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"INT\"") + ) } test("Check map key types: unsafe casts are not allowed") { val mapKeyString = MapType(StringType, StringType) val mapKeyInt = MapType(IntegerType, StringType) - assertSingleError(mapKeyString, mapKeyInt, "m", - "Should not allow map of string keys to map of int keys") { err => - assert(err.contains("'m.key'"), "Should identify problem with named map's key type") - assert(err.contains("Cannot safely cast")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", mapKeyString, mapKeyInt, true, + analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`arr`.`key`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"INT\"") + ) } test("Conversions between timestamp and long are not allowed") { - assertSingleError(LongType, TimestampType, "longToTimestamp", - "Should not allow long to timestamp") { err => - assert(err.contains("Cannot safely cast 'longToTimestamp': bigint to timestamp")) - } - - assertSingleError(TimestampType, LongType, "timestampToLong", - "Should not allow timestamp to long") { err => - assert(err.contains("Cannot safely cast 'timestampToLong': timestamp to bigint")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", LongType, TimestampType, true, + analysis.caseSensitiveResolution, "longToTimestamp", storeAssignmentPolicy, + errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`longToTimestamp`", + "srcType" -> "\"BIGINT\"", + "targetType" -> "\"TIMESTAMP\"") + ) + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", TimestampType, LongType, true, + analysis.caseSensitiveResolution, "timestampToLong", storeAssignmentPolicy, + errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`timestampToLong`", + "srcType" -> "\"TIMESTAMP\"", + "targetType" -> "\"BIGINT\"") + ) } test("SPARK-37707: Check datetime types compatible with each other") { @@ -215,13 +300,21 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assertAllowed(w, r, "t", s"Should allow writing $w to $r because cast is safe") } else { - assertSingleError(w, r, "t", - s"Should not allow writing $w to $r because cast is not safe") { err => - assert(err.contains("'t'"), "Should include the field name context") - assert(err.contains("Cannot safely cast"), "Should identify unsafe cast") - assert(err.contains(s"${w.catalogString}"), "Should include write type") - assert(err.contains(s"${r.catalogString}"), "Should include read type") - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", w, r, true, analysis.caseSensitiveResolution, "t", + storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`t`", + "srcType" -> toSQLType(w), + "targetType" -> toSQLType(r) + ), + matchPVals = true + ) } } } @@ -229,29 +322,33 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { test("Check struct types: missing required field") { val missingRequiredField = StructType(Seq(StructField("x", FloatType, nullable = false))) - assertSingleError(missingRequiredField, point2, "t", - "Should fail because required field 'y' is missing") { err => - assert(err.contains("'t'"), "Should include the struct name for context") - assert(err.contains("'y'"), "Should include the nested field name") - assert(err.contains("missing field"), "Should call out field missing") - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", missingRequiredField, point2, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS", + parameters = Map("tableName" -> "``", "colName" -> "`t`", "missingFields" -> "`y`") + ) } test("Check struct types: missing starting field, matched by position") { val missingRequiredField = StructType(Seq(StructField("y", FloatType, nullable = false))) - - // should have 2 errors: names x and y don't match, and field y is missing - assertNumErrors(missingRequiredField, point2, "t", - "Should fail because field 'x' is matched to field 'y' and required field 'y' is missing", 2) - { errs => - assert(errs(0).contains("'t'"), "Should include the struct name for context") - assert(errs(0).contains("expected 'x', found 'y'"), "Should detect name mismatch") - assert(errs(0).contains("field name does not match"), "Should identify name problem") - - assert(errs(1).contains("'t'"), "Should include the struct name for context") - assert(errs(1).contains("'y'"), "Should include the _last_ nested fields of the read schema") - assert(errs(1).contains("missing field"), "Should call out field missing") - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", missingRequiredField, point2, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.UNEXPECTED_COLUMN_NAME", + parameters = Map( + "expected" -> "`x`", + "found" -> "`y`", + "tableName" -> "``", + "colName" -> "`t`", + "order" -> "0") + ) } test("Check struct types: missing middle field, matched by position") { @@ -266,17 +363,20 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { // types are compatible: (req int, req int) => (req int, req int, opt int) // but this should still fail because the names do not match. - - assertNumErrors(missingMiddleField, expectedStruct, "t", - "Should fail because field 'y' is matched to field 'z'", 2) { errs => - assert(errs(0).contains("'t'"), "Should include the struct name for context") - assert(errs(0).contains("expected 'y', found 'z'"), "Should detect name mismatch") - assert(errs(0).contains("field name does not match"), "Should identify name problem") - - assert(errs(1).contains("'t'"), "Should include the struct name for context") - assert(errs(1).contains("'z'"), "Should include the nested field name") - assert(errs(1).contains("missing field"), "Should call out field missing") - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", missingMiddleField, expectedStruct, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.UNEXPECTED_COLUMN_NAME", + parameters = Map( + "expected" -> "`y`", + "found" -> "`z`", + "tableName" -> "``", + "colName" -> "`t`", + "order" -> "1") + ) } test("Check struct types: generic colN names are ignored") { @@ -300,20 +400,27 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { StructField("x", FloatType), StructField("y", FloatType, nullable = false))) - assertSingleError(requiredFieldIsOptional, point2, "t", - "Should fail because required field 'x' is optional") { err => - assert(err.contains("'t.x'"), "Should include the nested field name context") - assert(err.contains("Cannot write nullable values to non-null field")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", requiredFieldIsOptional, point2, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_COLUMN", + parameters = Map("tableName" -> "``", "colName" -> "`t`.`x`") + ) } test("Check struct types: data field would be dropped") { - assertSingleError(point3, point2, "t", - "Should fail because field 'z' would be dropped") { err => - assert(err.contains("'t'"), "Should include the struct name for context") - assert(err.contains("'z'"), "Should include the extra field name") - assert(err.contains("Cannot write extra fields")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", point3, point2, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS", + parameters = Map("tableName" -> "``", "colName" -> "`t`", "extraFields" -> "`z`") + ) } test("Check struct types: type promotion is allowed") { @@ -346,11 +453,15 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { val arrayOfRequired = ArrayType(LongType, containsNull = false) val arrayOfOptional = ArrayType(LongType) - assertSingleError(arrayOfOptional, arrayOfRequired, "arr", - "Should not allow array of optional elements to array of required elements") { err => - assert(err.contains("'arr'"), "Should include type name context") - assert(err.contains("Cannot write nullable elements to array of non-nulls")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", arrayOfOptional, arrayOfRequired, true, + analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_ARRAY_ELEMENTS", + parameters = Map("tableName" -> "``", "colName" -> "`arr`") + ) } test("Check array types: writing required to optional elements is allowed") { @@ -372,11 +483,15 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { val mapOfRequired = MapType(StringType, LongType, valueContainsNull = false) val mapOfOptional = MapType(StringType, LongType) - assertSingleError(mapOfOptional, mapOfRequired, "m", - "Should not allow map of optional values to map of required values") { err => - assert(err.contains("'m'"), "Should include type name context") - assert(err.contains("Cannot write nullable values to map of non-nulls")) - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", mapOfOptional, mapOfRequired, true, + analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_MAP_VALUES", + parameters = Map("tableName" -> "``", "colName" -> "`m`") + ) } test("Check map value types: writing required to optional values is allowed") { @@ -420,58 +535,20 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { StructField("y", StringType) )) - assertNumErrors(writeType, readType, "top", "Should catch 14 errors", 14) { errs => - assert(errs(0).contains("'top.a.element'"), "Should identify bad type") - assert(errs(0).contains("Cannot safely cast")) - assert(errs(0).contains("string to double")) - - assert(errs(1).contains("'top.a'"), "Should identify bad type") - assert(errs(1).contains("Cannot write nullable elements to array of non-nulls")) - - assert(errs(2).contains("'top.arr_of_structs.element'"), "Should identify bad type") - assert(errs(2).contains("'z'"), "Should identify bad field") - assert(errs(2).contains("Cannot write extra fields to struct")) - - assert(errs(3).contains("'top.arr_of_structs'"), "Should identify bad type") - assert(errs(3).contains("Cannot write nullable elements to array of non-nulls")) - - assert(errs(4).contains("'top.bad_nested_type'"), "Should identify bad type") - assert(errs(4).contains("is incompatible with")) - - assert(errs(5).contains("'top.m.key'"), "Should identify bad type") - assert(errs(5).contains("Cannot safely cast")) - assert(errs(5).contains("string to bigint")) - - assert(errs(6).contains("'top.m.value'"), "Should identify bad type") - assert(errs(6).contains("Cannot safely cast")) - assert(errs(6).contains("boolean to float")) - - assert(errs(7).contains("'top.m'"), "Should identify bad type") - assert(errs(7).contains("Cannot write nullable values to map of non-nulls")) - - assert(errs(8).contains("'top.map_of_structs.value'"), "Should identify bad type") - assert(errs(8).contains("expected 'y', found 'z'"), "Should detect name mismatch") - assert(errs(8).contains("field name does not match"), "Should identify name problem") - - assert(errs(9).contains("'top.map_of_structs.value'"), "Should identify bad type") - assert(errs(9).contains("'z'"), "Should identify missing field") - assert(errs(9).contains("missing fields"), "Should detect missing field") - - assert(errs(10).contains("'top.map_of_structs'"), "Should identify bad type") - assert(errs(10).contains("Cannot write nullable values to map of non-nulls")) - - assert(errs(11).contains("'top.x'"), "Should identify bad type") - assert(errs(11).contains("Cannot safely cast")) - assert(errs(11).contains("string to int")) - - assert(errs(12).contains("'top'"), "Should identify bad type") - assert(errs(12).contains("expected 'x', found 'y'"), "Should detect name mismatch") - assert(errs(12).contains("field name does not match"), "Should identify name problem") - - assert(errs(13).contains("'top'"), "Should identify bad type") - assert(errs(13).contains("'missing1'"), "Should identify missing field") - assert(errs(13).contains("missing fields"), "Should detect missing field") - } + val errs = new mutable.ArrayBuffer[String]() + checkError( + exception = intercept[AnalysisException] ( + DataTypeUtils.canWrite("", writeType, readType, true, + analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg => errs += errMsg) + ), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`t`.`a`.`element`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"DOUBLE\"" + ) + ) } // Helper functions @@ -483,8 +560,8 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { desc: String, byName: Boolean = true): Unit = { assert( - DataTypeUtils.canWrite(writeType, readType, byName, analysis.caseSensitiveResolution, name, - storeAssignmentPolicy, + DataTypeUtils.canWrite("", writeType, readType, byName, analysis.caseSensitiveResolution, + name, storeAssignmentPolicy, errMsg => fail(s"Should not produce errors but was called with: $errMsg")), desc) } @@ -509,8 +586,8 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { (checkErrors: Seq[String] => Unit): Unit = { val errs = new mutable.ArrayBuffer[String]() assert( - DataTypeUtils.canWrite(writeType, readType, byName, analysis.caseSensitiveResolution, name, - storeAssignmentPolicy, errMsg => errs += errMsg) === false, desc) + DataTypeUtils.canWrite("", writeType, readType, byName, analysis.caseSensitiveResolution, + name, storeAssignmentPolicy, errMsg => errs += errMsg) === false, desc) assert(errs.size === numErrs, s"Should produce $numErrs error messages") checkErrors(errs.toSeq) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index f8128c8c23e..f58f798b8de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -141,12 +141,13 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo checkAnswer(spark.table("testcat.table_name"), Seq.empty) - val exc = intercept[AnalysisException] { - spark.table("source").withColumnRenamed("data", "d").writeTo("testcat.table_name").append() - } - - assert(exc.getMessage.contains("Cannot find data for output column")) - assert(exc.getMessage.contains("'data'")) + checkError( + exception = intercept[AnalysisException] { + spark.table("source").withColumnRenamed("data", "d").writeTo("testcat.table_name").append() + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") + ) checkAnswer( spark.table("testcat.table_name"), @@ -244,13 +245,14 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo checkAnswer(spark.table("testcat.table_name"), Seq.empty) - val exc = intercept[AnalysisException] { - spark.table("source").withColumnRenamed("data", "d") + checkError( + exception = intercept[AnalysisException] { + spark.table("source").withColumnRenamed("data", "d") .writeTo("testcat.table_name").overwrite(lit(true)) - } - - assert(exc.getMessage.contains("Cannot find data for output column")) - assert(exc.getMessage.contains("'data'")) + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") + ) checkAnswer( spark.table("testcat.table_name"), @@ -348,13 +350,14 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo checkAnswer(spark.table("testcat.table_name"), Seq.empty) - val exc = intercept[AnalysisException] { - spark.table("source").withColumnRenamed("data", "d") + checkError( + exception = intercept[AnalysisException] { + spark.table("source").withColumnRenamed("data", "d") .writeTo("testcat.table_name").overwritePartitions() - } - - assert(exc.getMessage.contains("Cannot find data for output column")) - assert(exc.getMessage.contains("'data'")) + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> "`data`") + ) checkAnswer( spark.table("testcat.table_name"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 09ea35737f8..0bbed51d0a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -214,10 +214,9 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { processInsert("t1", df, overwrite = false, byName = true) }, v1ErrorClass = "_LEGACY_ERROR_TEMP_1186", - v2ErrorClass = "_LEGACY_ERROR_TEMP_1204", + v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", v1Parameters = Map.empty[String, String], - v2Parameters = Map("tableName" -> "testcat.t1", - "errors" -> "Cannot find data for output column 'c1'") + v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> "`c1`") ) val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*) checkV1AndV2Error( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala index d10dd8c478a..488b4d31bd9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, AttributeReference, BooleanLiteral, Cast, CheckOverflowInTableInsert, CreateNamedStruct, EvalMode, GetStructField, IntegerLiteral, LambdaFunction, LongLiteral, MapFromArrays, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, StaticInvoke} import org.apache.spark.sql.catalyst.plans.logical.{Assignment, InsertAction, MergeAction, MergeIntoTable, UpdateAction} @@ -578,13 +579,34 @@ class AlignMergeAssignmentsSuite extends AlignAssignmentsSuiteBase { "Multiple assignments for 'txt': 'a', 'b'") // two updates to a nested column - assertAnalysisException( - s"""MERGE INTO nested_struct_table t USING nested_struct_table src - |ON t.i = src.i - |$clause THEN - | UPDATE SET s.n_i = 1, s.n_s = null, s.n_i = -1 - |""".stripMargin, - "Multiple assignments for 's.n_i': 1, -1") + val e = intercept[AnalysisException] { + parseAndResolve( + s"""MERGE INTO nested_struct_table t USING nested_struct_table src + |ON t.i = src.i + |$clause THEN + | UPDATE SET s.n_i = 1, s.n_s = null, s.n_i = -1 + |""".stripMargin + ) + } + if (policy == StoreAssignmentPolicy.ANSI) { + checkError( + exception = e, + errorClass = "DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS", + parameters = Map( + "sqlExpr" -> "\"s.n_i = 1\", \"s.n_s = NULL\", \"s.n_i = -1\"", + "errors" -> "\n- Multiple assignments for 's.n_i': 1, -1") + ) + } else { + checkError( + exception = e, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`s`.`n_s`", + "srcType" -> "\"VOID\"", + "targetType" -> "\"STRUCT<DN_I:INT,DN_L:BIGINT>\"") + ) + } // conflicting updates to a nested struct and its fields assertAnalysisException( @@ -668,13 +690,20 @@ class AlignMergeAssignmentsSuite extends AlignAssignmentsSuiteBase { |""".stripMargin) assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i")) - assertAnalysisException( - s"""MERGE INTO nested_struct_table t USING nested_struct_table src - |ON t.i = src.i - |$clause THEN - | UPDATE SET s.n_s = named_struct('dn_i', 1) - |""".stripMargin, - "Cannot find data for output column 's.n_s.dn_l'") + val e = intercept[AnalysisException] { + parseAndResolve( + s"""MERGE INTO nested_struct_table t USING nested_struct_table src + |ON t.i = src.i + |$clause THEN + | UPDATE SET s.n_s = named_struct('dn_i', 1) + |""".stripMargin + ) + } + checkError( + exception = e, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "``", "colName" -> "`s`.`n_s`.`dn_l`") + ) // ANSI mode does NOT allow string to int casts assertAnalysisException( @@ -807,13 +836,20 @@ class AlignMergeAssignmentsSuite extends AlignAssignmentsSuiteBase { |""".stripMargin) assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i")) - assertAnalysisException( - s"""MERGE INTO nested_struct_table t USING nested_struct_table src - |ON t.i = src.i - |$clause THEN - | UPDATE SET s.n_s = named_struct('dn_i', 1) - |""".stripMargin, - "Cannot find data for output column 's.n_s.dn_l'") + val e = intercept[AnalysisException] { + parseAndResolve( + s"""MERGE INTO nested_struct_table t USING nested_struct_table src + |ON t.i = src.i + |$clause THEN + | UPDATE SET s.n_s = named_struct('dn_i', 1) + |""".stripMargin + ) + } + checkError( + exception = e, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "``", "colName" -> "`s`.`n_s`.`dn_l`") + ) // strict mode does NOT allow string to int casts assertAnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala index d1369e7e571..599f3e994ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, AttributeReference, BooleanLiteral, Cast, CheckOverflowInTableInsert, CreateNamedStruct, EvalMode, GetStructField, IntegerLiteral, LambdaFunction, LongLiteral, MapFromArrays, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable} @@ -469,9 +470,30 @@ class AlignUpdateAssignmentsSuite extends AlignAssignmentsSuiteBase { "Multiple assignments for 'i': 1, -1") // two updates to a nested column - assertAnalysisException( - "UPDATE nested_struct_table SET s.n_i = 1, s.n_s = null, s.n_i = -1", - "Multiple assignments for 's.n_i': 1, -1") + val e = intercept[AnalysisException] { + parseAndResolve( + "UPDATE nested_struct_table SET s.n_i = 1, s.n_s = null, s.n_i = -1" + ) + } + if (policy == StoreAssignmentPolicy.ANSI) { + checkError( + exception = e, + errorClass = "DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS", + parameters = Map( + "sqlExpr" -> "\"s.n_i = 1\", \"s.n_s = NULL\", \"s.n_i = -1\"", + "errors" -> "\n- Multiple assignments for 's.n_i': 1, -1") + ) + } else { + checkError( + exception = e, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "``", + "colName" -> "`s`.`n_s`", + "srcType" -> "\"VOID\"", + "targetType" -> "\"STRUCT<DN_I:INT,DN_L:BIGINT>\"") + ) + } // conflicting updates to a nested struct and its fields assertAnalysisException( @@ -509,9 +531,16 @@ class AlignUpdateAssignmentsSuite extends AlignAssignmentsSuiteBase { "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', NULL, 'dn_l', 1L)") assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i")) - assertAnalysisException( - "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)", - "Cannot find data for output column 's.n_s.dn_l'") + val e = intercept[AnalysisException] { + parseAndResolve( + "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)" + ) + } + checkError( + exception = e, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "``", "colName" -> "`s`.`n_s`.`dn_l`") + ) // ANSI mode does NOT allow string to int casts assertAnalysisException( @@ -555,9 +584,16 @@ class AlignUpdateAssignmentsSuite extends AlignAssignmentsSuiteBase { |SET s.n_s = named_struct('dn_i', CAST (NULL AS INT), 'dn_l', 1L)""".stripMargin) assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i")) - assertAnalysisException( - "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)", - "Cannot find data for output column 's.n_s.dn_l'") + val e = intercept[AnalysisException] { + parseAndResolve( + "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)" + ) + } + checkError( + exception = e, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", + parameters = Map("tableName" -> "``", "colName" -> "`s`.`n_s`.`dn_l`") + ) // strict mode does NOT allow string to int casts assertAnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 48bdd799017..c6bfd8c14dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -657,18 +657,25 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t select 1L, 2") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", - parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot safely cast 'i': bigint to int")) + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "colName" -> "`i`", + "srcType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"") + ) checkError( exception = intercept[AnalysisException] { sql("insert into t select 1, 2.0") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot safely cast 'd': decimal(2,1) to double")) + "colName" -> "`d`", + "srcType" -> "\"DECIMAL(2,1)\"", + "targetType" -> "\"DOUBLE\"") + ) checkError( exception = intercept[AnalysisException] { @@ -697,31 +704,34 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t values('a', 'b')") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'i': string to int\n- " + - "Cannot safely cast 'd': string to double")) + "colName" -> "`i`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"INT\"") ) checkError( exception = intercept[AnalysisException] { sql("insert into t values(now(), now())") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " + - "Cannot safely cast 'd': timestamp to double")) + "colName" -> "`i`", + "srcType" -> "\"TIMESTAMP\"", + "targetType" -> "\"INT\"") ) checkError( exception = intercept[AnalysisException] { sql("insert into t values(true, false)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'i': boolean to int\n- " + - "Cannot safely cast 'd': boolean to double")) + "colName" -> "`i`", + "srcType" -> "\"BOOLEAN\"", + "targetType" -> "\"INT\"") ) } } @@ -839,11 +849,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " + - "Cannot safely cast 't': int to timestamp")) + "colName" -> "`i`", + "srcType" -> "\"TIMESTAMP\"", + "targetType" -> "\"INT\"") ) } @@ -853,11 +864,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("INSERT INTO t VALUES (date('2010-09-02'), 1)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'i': date to int\n- " + - "Cannot safely cast 'd': int to date")) + "colName" -> "`i`", + "srcType" -> "\"DATE\"", + "targetType" -> "\"INT\"") ) } @@ -867,11 +879,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", - parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'b': timestamp to boolean\n- " + - "Cannot safely cast 't': boolean to timestamp")) + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "colName" -> "`b`", + "srcType" -> "\"TIMESTAMP\"", + "targetType" -> "\"BOOLEAN\"") ) } @@ -881,11 +894,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("INSERT INTO t VALUES (date('2010-09-02'), true)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", - parameters = Map( - "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> ("Cannot safely cast 'b': date to boolean\n- " + - "Cannot safely cast 'd': boolean to date")) + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "colName" -> "`b`", + "srcType" -> "\"DATE\"", + "targetType" -> "\"BOOLEAN\"") ) } } @@ -1374,10 +1388,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t (i) values (true)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot find data for output column 's'")) + "colName" -> "`s`")) } withTable("t") { sql("create table t(i boolean default true, s bigint) using parquet") @@ -1385,10 +1399,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t (i) values (default)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot find data for output column 's'")) + "colName" -> "`s`")) } withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") @@ -1396,10 +1410,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t (s) values (default)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot find data for output column 'i'")) + "colName" -> "`i`")) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") @@ -1407,10 +1421,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t partition(i='true') (s) values(5)") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot find data for output column 'q'")) + "colName" -> "`q`")) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") @@ -1418,10 +1432,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t partition(i='false') (q) select 43") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot find data for output column 's'")) + "colName" -> "`s`")) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") @@ -1429,10 +1443,10 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t partition(i='false') (q) select default") }, - errorClass = "_LEGACY_ERROR_TEMP_1204", + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "errors" -> "Cannot find data for output column 's'")) + "colName" -> "`s`")) } } // When the CASE_SENSITIVE configuration is enabled, then using different cases for the required diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 44c9fbadfac..17348fe2dcb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -382,10 +382,17 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with withTable("t") { sql("create table t(i int, d double) using parquet") // Calling `saveAsTable` to an existing table with append mode results in table insertion. - val msg = intercept[AnalysisException] { - Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': bigint to int")) + checkError( + exception = intercept[AnalysisException] { + Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "colName" -> "`i`", + "srcType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"") + ) // Insert into table successfully. Seq((1, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") @@ -403,17 +410,29 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with withTable("t") { sql("create table t(i int, d double) using parquet") // Calling `saveAsTable` to an existing table with append mode results in table insertion. - var msg = intercept[AnalysisException] { - Seq(("a", "b")).toDF("i", "d").write.mode("append").saveAsTable("t") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': string to int") && - msg.contains("Cannot safely cast 'd': string to double")) + checkError( + exception = intercept[AnalysisException] { + Seq(("a", "b")).toDF("i", "d").write.mode("append").saveAsTable("t") + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "colName" -> "`i`", + "srcType" -> "\"STRING\"", + "targetType" -> "\"INT\"") + ) - msg = intercept[AnalysisException] { - Seq((true, false)).toDF("i", "d").write.mode("append").saveAsTable("t") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': boolean to int") && - msg.contains("Cannot safely cast 'd': boolean to double")) + checkError( + exception = intercept[AnalysisException] { + Seq((true, false)).toDF("i", "d").write.mode("append").saveAsTable("t") + }, + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "colName" -> "`i`", + "srcType" -> "\"BOOLEAN\"", + "targetType" -> "\"INT\"") + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index d6bf7773293..8476c87dc28 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -949,8 +949,15 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3" if (version == "0.12" || version == "0.13") { - val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage - assert(e.contains(errorMsg)) + checkError( + exception = intercept[AnalysisException](versionSpark.sql(insertStmt)), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`tab1`", + "colName" -> "`f0`", + "srcType" -> "\"DECIMAL(2,1)\"", + "targetType" -> "\"BINARY\"") + ) } else { versionSpark.sql(insertStmt) assert(versionSpark.table(tableName).collect() === @@ -959,8 +966,15 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) } else { val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 1.3" if (version == "0.12" || version == "0.13") { - val e = intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage - assert(e.contains(errorMsg)) + checkError( + exception = intercept[AnalysisException](versionSpark.sql(insertStmt)), + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`tab1`", + "colName" -> "`f0`", + "srcType" -> "\"DECIMAL(2,1)\"", + "targetType" -> "\"BINARY\"") + ) } else { versionSpark.sql(insertStmt) assert(versionSpark.table(tableName).collect() === --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org