This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 00f66994c80 [SPARK-44577][SQL] Fix INSERT BY NAME returns nonsensical error message 00f66994c80 is described below commit 00f66994c802faf9ccc0d40ed4f6ff32992ba00f Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Fri Sep 1 20:27:17 2023 +0800 [SPARK-44577][SQL] Fix INSERT BY NAME returns nonsensical error message ### What changes were proposed in this pull request? Fix INSERT BY NAME returns nonsensical error message on v1 datasource. eg: ```scala CREATE TABLE bug(c1 INT); INSERT INTO bug BY NAME SELECT 1 AS c2; ==> Multi-part identifier cannot be empty. ``` After PR: ```scala [INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA] Cannot write incompatible data for the table `spark_catalog`.`default`.`bug`: Cannot find data for the output column `c1`. ``` Also fixed the same issue when throwing other INCOMPATIBLE_DATA_FOR_TABLE type errors ### Why are the changes needed? Fix the error msg nonsensical. ### Does this PR introduce _any_ user-facing change? Yes, the error msg in v1 insert by name will be changed. ### How was this patch tested? add new test. Closes #42220 from Hisoka-X/SPARK-44577_insert_by_name_bug_fix. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../src/main/resources/error/error-classes.json | 5 +++ ...ions-incompatible-data-for-table-error-class.md | 4 +++ .../catalyst/analysis/ResolveInsertionBase.scala | 4 +-- .../catalyst/analysis/TableOutputResolver.scala | 36 +++++++++++++++------- .../spark/sql/errors/QueryCompilationErrors.scala | 19 +++++++++--- .../org/apache/spark/sql/SQLInsertTestSuite.scala | 5 +-- 6 files changed, 54 insertions(+), 19 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index af78dd2f9f8..87b9da7638b 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -1035,6 +1035,11 @@ "Cannot safely cast <colName> <srcType> to <targetType>." ] }, + "EXTRA_COLUMNS" : { + "message" : [ + "Cannot write extra columns <extraColumns>." + ] + }, "EXTRA_STRUCT_FIELDS" : { "message" : [ "Cannot write extra fields <extraFields> to the struct <colName>." diff --git a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md index f70b69ba6c5..0dd28e9d55c 100644 --- a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md +++ b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md @@ -37,6 +37,10 @@ Cannot find data for the output column `<colName>`. Cannot safely cast `<colName>` `<srcType>` to `<targetType>`. +## EXTRA_COLUMNS + +Cannot write extra columns `<extraColumns>`. + ## EXTRA_STRUCT_FIELDS Cannot write extra fields `<extraFields>` to the struct `<colName>`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala index 8b120095bc6..ad89005a093 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala @@ -36,10 +36,10 @@ abstract class ResolveInsertionBase extends Rule[LogicalPlan] { if (i.userSpecifiedCols.size != i.query.output.size) { if (i.userSpecifiedCols.size > i.query.output.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tblName, i.userSpecifiedCols, i.query) + tblName, i.userSpecifiedCols, i.query.output) } else { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( - tblName, i.userSpecifiedCols, i.query) + tblName, i.userSpecifiedCols, i.query.output) } } val projectByName = i.userSpecifiedCols.zip(i.query.output) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index 21575f7b96b..fc0e727bea5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -87,7 +87,7 @@ object TableOutputResolver { if (actualExpectedCols.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( - tableName, actualExpectedCols.map(_.name), query) + tableName, actualExpectedCols.map(_.name), query.output) } val errors = new mutable.ArrayBuffer[String]() @@ -105,7 +105,7 @@ object TableOutputResolver { } else { if (actualExpectedCols.size > query.output.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tableName, actualExpectedCols.map(_.name), query) + tableName, actualExpectedCols.map(_.name), query.output) } resolveColumnsByPosition(tableName, query.output, actualExpectedCols, conf, errors += _) } @@ -267,9 +267,13 @@ object TableOutputResolver { if (matchedCols.size < inputCols.length) { val extraCols = inputCols.filterNot(col => matchedCols.contains(col.name)) .map(col => s"${toSQLId(col.name)}").mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( - tableName, colPath.quoted, extraCols - ) + if (colPath.isEmpty) { + throw QueryCompilationErrors.incompatibleDataToTableExtraColumnsError(tableName, + extraCols) + } else { + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, colPath.quoted, extraCols) + } } else { reordered } @@ -290,16 +294,26 @@ object TableOutputResolver { val extraColsStr = inputCols.takeRight(inputCols.size - expectedCols.size) .map(col => toSQLId(col.name)) .mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( - tableName, colPath.quoted, extraColsStr - ) + if (colPath.isEmpty) { + throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError(tableName, + expectedCols.map(_.name), inputCols.map(_.toAttribute)) + } else { + throw QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError( + tableName, colPath.quoted, extraColsStr + ) + } } else if (inputCols.size < expectedCols.size) { val missingColsStr = expectedCols.takeRight(expectedCols.size - inputCols.size) .map(col => toSQLId(col.name)) .mkString(", ") - throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError( - tableName, colPath.quoted, missingColsStr - ) + if (colPath.isEmpty) { + throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(tableName, + expectedCols.map(_.name), inputCols.map(_.toAttribute)) + } else { + throw QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError( + tableName, colPath.quoted, missingColsStr + ) + } } inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index a97abf89434..ca101e79d92 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2159,25 +2159,25 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat def cannotWriteTooManyColumnsToTableError( tableName: String, expected: Seq[String], - query: LogicalPlan): Throwable = { + queryOutput: Seq[Attribute]): Throwable = { new AnalysisException( errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", messageParameters = Map( "tableName" -> toSQLId(tableName), "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "), - "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) + "dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", "))) } def cannotWriteNotEnoughColumnsToTableError( tableName: String, expected: Seq[String], - query: LogicalPlan): Throwable = { + queryOutput: Seq[Attribute]): Throwable = { new AnalysisException( errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", messageParameters = Map( "tableName" -> toSQLId(tableName), "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "), - "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) + "dataColumns" -> queryOutput.map(c => toSQLId(c.name)).mkString(", "))) } def incompatibleDataToTableCannotFindDataError( @@ -2202,6 +2202,17 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat ) } + def incompatibleDataToTableExtraColumnsError( + tableName: String, extraColumns: String): Throwable = { + new AnalysisException( + errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS", + messageParameters = Map( + "tableName" -> toSQLId(tableName), + "extraColumns" -> extraColumns + ) + ) + } + def incompatibleDataToTableExtraStructFieldsError( tableName: String, colName: String, extraFields: String): Throwable = { new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 0bbed51d0a9..34e4ded09b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -213,9 +213,10 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { exception = intercept[AnalysisException] { processInsert("t1", df, overwrite = false, byName = true) }, - v1ErrorClass = "_LEGACY_ERROR_TEMP_1186", + v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_COLUMNS", v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA", - v1Parameters = Map.empty[String, String], + v1Parameters = Map("tableName" -> "`spark_catalog`.`default`.`t1`", + "extraColumns" -> "`x1`"), v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> "`c1`") ) val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org