This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f3db20c17df [SPARK-43969][SQL] Refactor & Assign names to the error class _LEGACY_ERROR_TEMP_1170 f3db20c17df is described below commit f3db20c17dfdc1cb5daa42c154afa732e5e3800b Author: panbingkun <pbk1...@gmail.com> AuthorDate: Tue Jun 20 01:43:32 2023 +0300 [SPARK-43969][SQL] Refactor & Assign names to the error class _LEGACY_ERROR_TEMP_1170 ### What changes were proposed in this pull request? The pr aims to: - Refactor `PreWriteCheck` to use error framework. - Make `INSERT_COLUMN_ARITY_MISMATCH` more generic & avoiding to embed error's text in source code. - Assign name to _LEGACY_ERROR_TEMP_1170. - In `INSERT_PARTITION_COLUMN_ARITY_MISMATCH` error message, replace '' with `toSQLId` for table column name. ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. Closes #41458 from panbingkun/refactor_PreWriteCheck. Lead-authored-by: panbingkun <pbk1...@gmail.com> Co-authored-by: panbingkun <84731...@qq.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 62 ++++++++--- python/pyspark/sql/tests/test_readwriter.py | 4 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../catalyst/analysis/ResolveInsertionBase.scala | 13 ++- .../catalyst/analysis/TableOutputResolver.scala | 4 +- .../spark/sql/errors/QueryCompilationErrors.scala | 40 +++---- .../catalyst/analysis/V2WriteAnalysisSuite.scala | 48 +++++--- .../spark/sql/execution/datasources/rules.scala | 32 ++++-- .../analyzer-results/postgreSQL/numeric.sql.out | 7 +- .../sql-tests/results/postgreSQL/numeric.sql.out | 7 +- .../org/apache/spark/sql/DataFrameSuite.scala | 33 ++++-- .../org/apache/spark/sql/SQLInsertTestSuite.scala | 31 ++++-- .../spark/sql/connector/InsertIntoTests.scala | 34 ++++-- .../apache/spark/sql/execution/SQLViewSuite.scala | 11 +- .../spark/sql/execution/command/DDLSuite.scala | 54 +++++---- .../org/apache/spark/sql/sources/InsertSuite.scala | 122 +++++++++++++-------- .../spark/sql/hive/thriftserver/CliSuite.scala | 2 +- .../org/apache/spark/sql/hive/InsertSuite.scala | 11 +- 18 files changed, 324 insertions(+), 193 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 54b920cc36f..d9e729effeb 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -888,10 +888,24 @@ }, "INSERT_COLUMN_ARITY_MISMATCH" : { "message" : [ - "Cannot write to '<tableName>', <reason>:", - "Table columns: <tableColumns>.", - "Data columns: <dataColumns>." + "Cannot write to <tableName>, the reason is" ], + "subClass" : { + "NOT_ENOUGH_DATA_COLUMNS" : { + "message" : [ + "not enough data columns:", + "Table columns: <tableColumns>.", + "Data columns: <dataColumns>." + ] + }, + "TOO_MANY_DATA_COLUMNS" : { + "message" : [ + "too many data columns:", + "Table columns: <tableColumns>.", + "Data columns: <dataColumns>." + ] + } + }, "sqlState" : "21S01" }, "INSERT_PARTITION_COLUMN_ARITY_MISMATCH" : { @@ -1715,6 +1729,11 @@ ], "sqlState" : "46110" }, + "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" : { + "message" : [ + "<cmd> is not supported, if you want to enable it, please set \"spark.sql.catalogImplementation\" to \"hive\"." + ] + }, "NOT_SUPPORTED_IN_JDBC_CATALOG" : { "message" : [ "Not supported command in JDBC catalog:" @@ -2464,6 +2483,33 @@ "grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup." ] }, + "UNSUPPORTED_INSERT" : { + "message" : [ + "Can't insert into the target." + ], + "subClass" : { + "NOT_ALLOWED" : { + "message" : [ + "The target relation <relationId> does not allow insertion." + ] + }, + "NOT_PARTITIONED" : { + "message" : [ + "The target relation <relationId> is not partitioned." + ] + }, + "RDD_BASED" : { + "message" : [ + "An RDD-based table is not allowed." + ] + }, + "READ_FROM" : { + "message" : [ + "The target relation <relationId> is also being read from." + ] + } + } + }, "UNSUPPORTED_OVERWRITE" : { "message" : [ "Can't overwrite the target that is also being read from." @@ -3005,11 +3051,6 @@ "Window function <wf> requires window to be ordered, please add ORDER BY clause. For example SELECT <wf>(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table." ] }, - "_LEGACY_ERROR_TEMP_1038" : { - "message" : [ - "Cannot write to table due to mismatched user specified column size(<columnSize>) and data column size(<outputSize>)." - ] - }, "_LEGACY_ERROR_TEMP_1039" : { "message" : [ "Multiple time/session window expressions would result in a cartesian product of rows, therefore they are currently not supported." @@ -3506,11 +3547,6 @@ "Table partitions: <partColNames>." ] }, - "_LEGACY_ERROR_TEMP_1170" : { - "message" : [ - "Hive support is required to <detail>." - ] - }, "_LEGACY_ERROR_TEMP_1171" : { "message" : [ "createTableColumnTypes option column <col> not found in schema <schema>." diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py index 17c158a870a..6bcef51732f 100644 --- a/python/pyspark/sql/tests/test_readwriter.py +++ b/python/pyspark/sql/tests/test_readwriter.py @@ -226,7 +226,9 @@ class ReadwriterV2TestsMixin: def test_create_without_provider(self): df = self.df - with self.assertRaisesRegex(AnalysisException, "Hive support is required"): + with self.assertRaisesRegex( + AnalysisException, "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" + ): df.writeTo("test_table").create() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1b023413aa9..488c39d5dd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1289,7 +1289,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // Create a project if this is an INSERT INTO BY NAME query. val projectByName = if (i.userSpecifiedCols.nonEmpty) { - Some(createProjectForByNameQuery(i)) + Some(createProjectForByNameQuery(r.table.name, i)) } else { None } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala index 71d36867951..8b120095bc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInsertionBase.scala @@ -28,12 +28,19 @@ abstract class ResolveInsertionBase extends Rule[LogicalPlan] { def resolver: Resolver = conf.resolver /** Add a project to use the table column names for INSERT INTO BY NAME */ - protected def createProjectForByNameQuery(i: InsertIntoStatement): LogicalPlan = { + protected def createProjectForByNameQuery( + tblName: String, + i: InsertIntoStatement): LogicalPlan = { SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver) if (i.userSpecifiedCols.size != i.query.output.size) { - throw QueryCompilationErrors.writeTableWithMismatchedColumnsError( - i.userSpecifiedCols.size, i.query.output.size, i.query) + if (i.userSpecifiedCols.size > i.query.output.size) { + throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( + tblName, i.userSpecifiedCols, i.query) + } else { + throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( + tblName, i.userSpecifiedCols, i.query) + } } val projectByName = i.userSpecifiedCols.zip(i.query.output) .map { case (userSpecifiedCol, queryOutputCol) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index b9aca30c754..3b721cf5d0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -47,7 +47,7 @@ object TableOutputResolver { if (actualExpectedCols.size < query.output.size) { throw QueryCompilationErrors.cannotWriteTooManyColumnsToTableError( - tableName, actualExpectedCols, query) + tableName, actualExpectedCols.map(_.name), query) } val errors = new mutable.ArrayBuffer[String]() @@ -74,7 +74,7 @@ object TableOutputResolver { } if (actualExpectedCols.size > queryOutputCols.size) { throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError( - tableName, actualExpectedCols, query) + tableName, actualExpectedCols.map(_.name), query) } resolveColumnsByPosition(queryOutputCols, actualExpectedCols, conf, errors += _) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 01b90210047..1b5062e985b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -613,16 +613,6 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { messageParameters = Map("wf" -> wf.toString)) } - def writeTableWithMismatchedColumnsError( - columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = { - new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1038", - messageParameters = Map( - "columnSize" -> columnSize.toString, - "outputSize" -> outputSize.toString), - origin = t.origin) - } - def multiTimeWindowExpressionsNotSupportedError(t: TreeNode[_]): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1039", @@ -1743,10 +1733,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "partColNames" -> partColNames.map(_.name).mkString(","))) } - def ddlWithoutHiveSupportEnabledError(detail: String): Throwable = { + def ddlWithoutHiveSupportEnabledError(cmd: String): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1170", - messageParameters = Map("detail" -> detail)) + errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", + messageParameters = Map("cmd" -> cmd)) } def createTableColumnTypesOptionColumnNotFoundInSchemaError( @@ -2056,26 +2046,26 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def cannotWriteTooManyColumnsToTableError( tableName: String, - expected: Seq[Attribute], + expected: Seq[String], query: LogicalPlan): Throwable = { new AnalysisException( - errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", messageParameters = Map( - "tableName" -> tableName, - "reason" -> "too many data columns", - "tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "), - "dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", "))) + "tableName" -> toSQLId(tableName), + "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "), + "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) } def cannotWriteNotEnoughColumnsToTableError( - tableName: String, expected: Seq[Attribute], query: LogicalPlan): Throwable = { + tableName: String, + expected: Seq[String], + query: LogicalPlan): Throwable = { new AnalysisException( - errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", messageParameters = Map( - "tableName" -> tableName, - "reason" -> "not enough data columns", - "tableColumns" -> expected.map(c => s"'${c.name}'").mkString(", "), - "dataColumns" -> query.output.map(c => s"'${c.name}'").mkString(", "))) + "tableName" -> toSQLId(tableName), + "tableColumns" -> expected.map(c => toSQLId(c)).mkString(", "), + "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", "))) } def cannotWriteIncompatibleDataToTableError(tableName: String, errors: Seq[String]): Throwable = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala index 0b698ae07a2..5b51730e759 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala @@ -443,10 +443,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write", "'table-name'", "too many data columns", - "Table columns: 'x', 'y'", - "Data columns: 'x', 'y', 'z'")) + assertAnalysisErrorClass( + inputPlan = parsedPlan, + expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "tableColumns" -> "`x`, `y`", + "dataColumns" -> "`x`, `y`, `z`") + ) } test("byName: fail extra data fields in struct") { @@ -523,10 +527,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byPosition(requiredTable, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write", "'table-name'", "not enough data columns", - "Table columns: 'x', 'y'", - "Data columns: 'y'")) + assertAnalysisErrorClass( + inputPlan = parsedPlan, + expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "tableColumns" -> "`x`, `y`", + "dataColumns" -> "`y`") + ) } test("byPosition: missing optional columns cause failure") { @@ -537,10 +545,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byPosition(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write", "'table-name'", "not enough data columns", - "Table columns: 'x', 'y'", - "Data columns: 'y'")) + assertAnalysisErrorClass( + inputPlan = parsedPlan, + expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "tableColumns" -> "`x`, `y`", + "dataColumns" -> "`y`") + ) } test("byPosition: insert safe cast") { @@ -572,10 +584,14 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest { val parsedPlan = byName(table, query) assertNotResolved(parsedPlan) - assertAnalysisError(parsedPlan, Seq( - "Cannot write", "'table-name'", "too many data columns", - "Table columns: 'x', 'y'", - "Data columns: 'a', 'b', 'c'")) + assertAnalysisErrorClass( + inputPlan = parsedPlan, + expectedErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + expectedMessageParameters = Map( + "tableName" -> "`table-name`", + "tableColumns" -> "`x`, `y`", + "dataColumns" -> "`a`, `b`, `c`") + ) } test("bypass output column resolution") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 0b07ae1d11c..750f39a252f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -391,7 +391,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase { // Create a project if this INSERT has a user-specified column list. val hasColumnList = insert.userSpecifiedCols.nonEmpty val query = if (hasColumnList) { - createProjectForByNameQuery(insert) + createProjectForByNameQuery(tblName, insert) } else { insert.query } @@ -401,12 +401,13 @@ object PreprocessTableInsertion extends ResolveInsertionBase { supportColDefaultValue = true) } catch { case e: AnalysisException if staticPartCols.nonEmpty && - e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH" => + (e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS" || + e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS") => val newException = e.copy( errorClass = Some("INSERT_PARTITION_COLUMN_ARITY_MISMATCH"), messageParameters = e.messageParameters ++ Map( - "tableColumns" -> insert.table.output.map(c => s"'${c.name}'").mkString(", "), - "staticPartCols" -> staticPartCols.toSeq.sorted.map(c => s"'$c'").mkString(", ") + "tableColumns" -> insert.table.output.map(c => toSQLId(c.name)).mkString(", "), + "staticPartCols" -> staticPartCols.toSeq.sorted.map(c => toSQLId(c)).mkString(", ") )) newException.setStackTrace(e.getStackTrace) throw newException @@ -503,8 +504,6 @@ object PreReadCheck extends (LogicalPlan => Unit) { */ object PreWriteCheck extends (LogicalPlan => Unit) { - def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } - def apply(plan: LogicalPlan): Unit = { plan.foreach { case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, @@ -514,7 +513,9 @@ object PreWriteCheck extends (LogicalPlan => Unit) { case LogicalRelation(src, _, _, _) => src } if (srcRelations.contains(relation)) { - failAnalysis("Cannot insert into table that is also being read from.") + throw new AnalysisException( + errorClass = "UNSUPPORTED_INSERT.READ_FROM", + messageParameters = Map("relationId" -> toSQLId(relation.toString))) } else { // OK } @@ -524,10 +525,15 @@ object PreWriteCheck extends (LogicalPlan => Unit) { // Right now, we do not support insert into a non-file-based data source table with // partition specs. - case _: InsertableRelation if partition.nonEmpty => - failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") - - case _ => failAnalysis(s"$relation does not allow insertion.") + case i: InsertableRelation if partition.nonEmpty => + throw new AnalysisException( + errorClass = "UNSUPPORTED_INSERT.NOT_PARTITIONED", + messageParameters = Map("relationId" -> toSQLId(i.toString))) + + case _ => + throw new AnalysisException( + errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED", + messageParameters = Map("relationId" -> toSQLId(relation.toString))) } case InsertIntoStatement(t, _, _, _, _, _, _) @@ -535,7 +541,9 @@ object PreWriteCheck extends (LogicalPlan => Unit) { t.isInstanceOf[Range] || t.isInstanceOf[OneRowRelation] || t.isInstanceOf[LocalRelation] => - failAnalysis(s"Inserting into an RDD-based table is not allowed.") + throw new AnalysisException( + errorClass = "UNSUPPORTED_INSERT.RDD_BASED", + messageParameters = Map.empty) case _ => // OK } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out index 95c81ff75ed..536631179d8 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/numeric.sql.out @@ -3841,12 +3841,11 @@ INSERT INTO num_result SELECT t1.id, t2.id, t1.val, t2.val, t1.val * t2.val -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH", + "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", "sqlState" : "21S01", "messageParameters" : { - "dataColumns" : "'id', 'id', 'val', 'val', '(val * val)'", - "reason" : "too many data columns", - "tableColumns" : "'id1', 'id2', 'result'", + "dataColumns" : "`id`, `id`, `val`, `val`, `(val * val)`", + "tableColumns" : "`id1`, `id2`, `result`", "tableName" : "`spark_catalog`.`default`.`num_result`" } } diff --git a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out index 5840e1164fa..61b7a07631c 100644 --- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/numeric.sql.out @@ -3832,12 +3832,11 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH", + "errorClass" : "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", "sqlState" : "21S01", "messageParameters" : { - "dataColumns" : "'id', 'id', 'val', 'val', '(val * val)'", - "reason" : "too many data columns", - "tableColumns" : "'id1', 'id2', 'result'", + "dataColumns" : "`id`, `id`, `val`, `val`, `(val * val)`", + "tableColumns" : "`id1`, `id2`, `result`", "tableName" : "`spark_catalog`.`default`.`num_result`" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c94c6d7e50e..55ea09b5945 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1831,25 +1831,34 @@ class DataFrameSuite extends QueryTest // error cases: insert into an RDD df.createOrReplaceTempView("rdd_base") - val e1 = intercept[AnalysisException] { - insertion.write.insertInto("rdd_base") - } - assert(e1.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + checkError( + exception = intercept[AnalysisException] { + insertion.write.insertInto("rdd_base") + }, + errorClass = "UNSUPPORTED_INSERT.RDD_BASED", + parameters = Map.empty + ) // error case: insert into a logical plan that is not a LeafNode val indirectDS = pdf.select("_1").filter($"_1" > 5) indirectDS.createOrReplaceTempView("indirect_ds") - val e2 = intercept[AnalysisException] { - insertion.write.insertInto("indirect_ds") - } - assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + checkError( + exception = intercept[AnalysisException] { + insertion.write.insertInto("indirect_ds") + }, + errorClass = "UNSUPPORTED_INSERT.RDD_BASED", + parameters = Map.empty + ) // error case: insert into an OneRowRelation Dataset.ofRows(spark, OneRowRelation()).createOrReplaceTempView("one_row") - val e3 = intercept[AnalysisException] { - insertion.write.insertInto("one_row") - } - assert(e3.getMessage.contains("Inserting into an RDD-based table is not allowed.")) + checkError( + exception = intercept[AnalysisException] { + insertion.write.insertInto("one_row") + }, + errorClass = "UNSUPPORTED_INSERT.RDD_BASED", + parameters = Map.empty + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index bb3125de9c4..98e71362fda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -223,13 +223,14 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { exception = intercept[AnalysisException] { processInsert("t1", df2, overwrite = false, byName = true) }, - v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH", - v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH", + v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", v1Parameters = Map("tableName" -> "`spark_catalog`.`default`.`t1`", - "reason" -> "too many data columns", "tableColumns" -> "'c1', 'c2', 'c3'", - "dataColumns" -> "'c3', 'c2', 'c1', 'c0'"), - v2Parameters = Map("tableName" -> "testcat.t1", "reason" -> "too many data columns", - "tableColumns" -> "'c1', 'c2', 'c3'", "dataColumns" -> "'c3', 'c2', 'c1', 'c0'") + "tableColumns" -> "`c1`, `c2`, `c3`", + "dataColumns" -> "`c3`, `c2`, `c1`, `c0`"), + v2Parameters = Map("tableName" -> "`testcat`.`t1`", + "tableColumns" -> "`c1`, `c2`, `c3`", + "dataColumns" -> "`c3`, `c2`, `c1`, `c0`") ) } } @@ -302,18 +303,24 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)") }, sqlState = None, - errorClass = "_LEGACY_ERROR_TEMP_1038", - parameters = Map("columnSize" -> "2", "outputSize" -> "3"), - context = ExpectedContext("values(1, 2, 3)", 24, 38) + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + parameters = Map( + "tableName" -> ".*`t1`", + "tableColumns" -> "`c1`, `c2`", + "dataColumns" -> "`col1`, `col2`, `col3`"), + matchPVals = true ) checkError( exception = intercept[AnalysisException] { sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)") }, sqlState = None, - errorClass = "_LEGACY_ERROR_TEMP_1038", - parameters = Map("columnSize" -> "3", "outputSize" -> "2"), - context = ExpectedContext("values(1, 2)", 28, 39) + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> ".*`t1`", + "tableColumns" -> "`c1`, `c2`, `c3`", + "dataColumns" -> "`col1`, `col2`"), + matchPVals = true ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala index 2ff9981c8d9..63bb0148972 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/InsertIntoTests.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.util.TypeUtils._ import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} import org.apache.spark.sql.test.SharedSparkSession @@ -123,13 +124,19 @@ abstract class InsertIntoTests( val t1 = s"${catalogAndNamespace}tbl" sql(s"CREATE TABLE $t1 (id bigint, data string, missing string) USING $v2Format") val df = Seq((1L, "a"), (2L, "b"), (3L, "c")).toDF("id", "data") - val exc = intercept[AnalysisException] { - doInsert(t1, df) - } verifyTable(t1, Seq.empty[(Long, String, String)].toDF("id", "data", "missing")) - val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1 - assert(exc.getMessage.contains(s"Cannot write to '$tableName', not enough data columns")) + val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, df) + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> tableName, + "tableColumns" -> "`id`, `data`, `missing`", + "dataColumns" -> "`id`, `data`") + ) } test("insertInto: fails when an extra column is present") { @@ -137,13 +144,18 @@ abstract class InsertIntoTests( withTable(t1) { sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format") val df = Seq((1L, "a", "mango")).toDF("id", "data", "fruit") - val exc = intercept[AnalysisException] { - doInsert(t1, df) - } - verifyTable(t1, Seq.empty[(Long, String)].toDF("id", "data")) - val tableName = if (catalogAndNamespace.isEmpty) s"default.$t1" else t1 - assert(exc.getMessage.contains(s"Cannot write to '$tableName', too many data columns")) + val tableName = if (catalogAndNamespace.isEmpty) toSQLId(s"default.$t1") else toSQLId(t1) + checkError( + exception = intercept[AnalysisException] { + doInsert(t1, df) + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", + parameters = Map( + "tableName" -> tableName, + "tableColumns" -> "`id`, `data`", + "dataColumns" -> "`id`, `data`, `fruit`") + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index af083dc8447..35c6ecb1dcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -203,10 +203,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { withTempView(viewName) { spark.range(10).createTempView(viewName) - val e = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $viewName SELECT 1") - }.getMessage - assert(e.contains("Inserting into an RDD-based table is not allowed")) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $viewName SELECT 1") + }, + errorClass = "UNSUPPORTED_INSERT.RDD_BASED", + parameters = Map.empty + ) val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 05ba4e72fe1..21e6980db8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -82,10 +82,13 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") val tabName = "tbl" withTable(tabName) { - val e = intercept[AnalysisException] { - sql(s"CREATE TABLE $tabName (i INT, j STRING) STORED AS parquet") - }.getMessage - assert(e.contains("Hive support is required to CREATE Hive TABLE")) + checkError( + exception = intercept[AnalysisException] { + sql(s"CREATE TABLE $tabName (i INT, j STRING) STORED AS parquet") + }, + errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", + parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)") + ) } } @@ -94,15 +97,18 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { withTempDir { tempDir => val tabName = "tbl" withTable(tabName) { - val e = intercept[AnalysisException] { - sql( - s""" - |CREATE EXTERNAL TABLE $tabName (i INT, j STRING) - |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' - |LOCATION '${tempDir.toURI}' - """.stripMargin) - }.getMessage - assert(e.contains("Hive support is required to CREATE Hive TABLE")) + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |CREATE EXTERNAL TABLE $tabName (i INT, j STRING) + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' + |LOCATION '${tempDir.toURI}' + """.stripMargin) + }, + errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", + parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)") + ) } } } @@ -110,16 +116,22 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { test("Create Hive Table As Select") { import testImplicits._ withTable("t", "t1") { - var e = intercept[AnalysisException] { - sql("CREATE TABLE t STORED AS parquet SELECT 1 as a, 1 as b") - }.getMessage - assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)")) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE TABLE t STORED AS parquet SELECT 1 as a, 1 as b") + }, + errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", + parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)") + ) spark.range(1).select($"id" as Symbol("a"), $"id" as Symbol("b")).write.saveAsTable("t1") - e = intercept[AnalysisException] { - sql("CREATE TABLE t STORED AS parquet SELECT a, b from t1") - }.getMessage - assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)")) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE TABLE t STORED AS parquet SELECT a, b from t1") + }, + errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", + parameters = Map("cmd" -> "CREATE Hive TABLE (AS SELECT)") + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index f3b18863175..71c3301a0eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -112,10 +112,47 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { """.stripMargin) sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2") - val message = intercept[AnalysisException] { - sql("INSERT INTO TABLE t1 SELECT a FROM t2") - }.getMessage - assert(message.contains("does not allow insertion")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO TABLE t1 SELECT a FROM t2") + }, + errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED", + parameters = Map("relationId" -> "`SimpleScan(1,10)`") + ) + } + } + + test("UNSUPPORTED_INSERT.RDD_BASED: Inserting into an RDD-based table is not allowed") { + import testImplicits._ + withTempView("t1") { + sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t1") + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO TABLE t1 SELECT a FROM t1") + }, + errorClass = "UNSUPPORTED_INSERT.RDD_BASED", + parameters = Map.empty + ) + } + } + + test("UNSUPPORTED_INSERT.READ_FROM: Cannot insert into table that is also being read from") { + withTempView("t1") { + sql( + """ + |CREATE TEMPORARY VIEW t1 + |USING org.apache.spark.sql.sources.SimpleScanSource + |OPTIONS ( + | From '1', + | To '10') + """.stripMargin) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO TABLE t1 SELECT * FROM t1") + }, + errorClass = "UNSUPPORTED_INSERT.READ_FROM", + parameters = Map("relationId" -> "`SimpleScan(1,10)`") + ) } } @@ -376,16 +413,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { (1 to 10).map(Row(_)).toSeq ) - val message = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE TABLE oneToTen SELECT CAST(a AS INT) FROM jt - """.stripMargin) - }.getMessage - assert( - message.contains("does not allow insertion."), - "It is not allowed to insert into a table that is not an InsertableRelation." - ) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT OVERWRITE TABLE oneToTen SELECT CAST(a AS INT) FROM jt") + }, + errorClass = "UNSUPPORTED_INSERT.NOT_ALLOWED", + parameters = Map("relationId" -> "`SimpleScan(1,10)`")) spark.catalog.dropTempView("oneToTen") } @@ -484,16 +517,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { test("Insert overwrite directory using Hive serde without turning on Hive support") { withTempDir { dir => val path = dir.toURI.getPath - val e = intercept[AnalysisException] { - sql( - s""" - |INSERT OVERWRITE LOCAL DIRECTORY '$path' - |STORED AS orc - |SELECT 1, 2 - """.stripMargin) - }.getMessage - assert(e.contains( - "Hive support is required to INSERT OVERWRITE DIRECTORY with the Hive format")) + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '$path' + |STORED AS orc + |SELECT 1, 2 + """.stripMargin) + }, + errorClass = "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", + parameters = Map("cmd" -> "INSERT OVERWRITE DIRECTORY with the Hive format") + ) } } @@ -639,12 +674,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t select 1, 2.0D, 3") }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "reason" -> "too many data columns", - "tableColumns" -> "'i', 'd'", - "dataColumns" -> "'1', '2.0', '3'")) + "tableColumns" -> "`i`, `d`", + "dataColumns" -> "`1`, `2`.`0`, `3`")) // Insert into table successfully. sql("insert into t select 1, 2.0D") @@ -937,12 +971,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( - "tableName" -> "unknown", - "reason" -> "not enough data columns", - "tableColumns" -> "'a', 'b'", - "dataColumns" -> "'a'")) + "tableName" -> "`unknown`", + "tableColumns" -> "`a`, `b`", + "dataColumns" -> "`a`")) } } @@ -1208,12 +1241,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " + "from num_data t1, num_data t2") }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "reason" -> "too many data columns", - "tableColumns" -> "'id1', 'int2', 'result'", - "dataColumns" -> "'id', 'id', 'val', 'val', '(val * val)'")) + "tableColumns" -> "`id1`, `int2`, `result`", + "dataColumns" -> "`id`, `id`, `val`, `val`, `(val * val)`")) } // The default value is disabled per configuration. withTable("t") { @@ -1252,12 +1284,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t values(true)") }, - errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", parameters = Map( "tableName" -> "`spark_catalog`.`default`.`t`", - "reason" -> "not enough data columns", - "tableColumns" -> "'i', 's'", - "dataColumns" -> "'col1'")) + "tableColumns" -> "`i`, `s`", + "dataColumns" -> "`col1`")) } } } @@ -1327,10 +1358,11 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { exception = intercept[AnalysisException] { sql("insert into t (i, q) select true from (select 1)") }, - errorClass = "_LEGACY_ERROR_TEMP_1038", - parameters = Map("columnSize" -> "2", "outputSize" -> "1"), - ExpectedContext( - fragment = "select true from (select 1)", start = 21, stop = 47)) + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "tableColumns" -> "`i`, `q`", + "dataColumns" -> "`true`")) } // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no // explicit DEFAULT value is available when the INSERT INTO statement provides fewer diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 90f371c7ec7..d3a9a9f0841 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -692,7 +692,7 @@ class CliSuite extends SparkFunSuite { // names. runCliWithin(1.minute, extraArgs = extraConf)( "create table src(key int) using hive;" -> - "Hive support is required to CREATE Hive TABLE", + "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT", "create table src(key int) using parquet;" -> "") cd.countDown() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 620ce88df19..adf0db957e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -39,7 +39,7 @@ case class TestData(key: Int, value: String) case class ThreeColumnTable(key: Int, value: String, key1: String) class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter - with SQLTestUtils with PrivateMethodTester { + with SQLTestUtils with PrivateMethodTester { import spark.implicits._ override lazy val testData = spark.sparkContext.parallelize( @@ -364,11 +364,10 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter }, errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH", parameters = Map( - "staticPartCols" -> "'b', 'c'", - "tableColumns" -> "'a', 'd', 'b', 'c'", - "reason" -> "too many data columns", - "dataColumns" -> "'1', '2', '3'", - "tableName" -> s"`spark_catalog`.`default`.`$tableName`") + "staticPartCols" -> "`b`, `c`", + "tableColumns" -> "`a`, `d`, `b`, `c`", + "dataColumns" -> "`1`, `2`, `3`", + "tableName" -> s"`spark_catalog`.`default`.`${tableName}`") ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org