This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 90ec7eaddb6 [SPARK-43957][SQL][TESTS] Use `checkError()` to check `Exception` in `*Insert*Suite` 90ec7eaddb6 is described below commit 90ec7eaddb66b6b2fe3afb8cdb68a9cf88f714de Author: panbingkun <pbk1...@gmail.com> AuthorDate: Sat Jun 3 22:22:20 2023 +0300 [SPARK-43957][SQL][TESTS] Use `checkError()` to check `Exception` in `*Insert*Suite` ### What changes were proposed in this pull request? The pr aims to use `checkError()` to check `Exception` in `*Insert*Suite`, include: - sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite - sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite - sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite - sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite #### Note: But this pr does not include some of these cases, which directly throw AnalysisExecution, such as: https://github.com/apache/spark/blob/898ad77900d887ac64800a616bd382def816eea6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala#L505-L515 After this PR, I will refactor these, assign them a name, and use the error framework. As these tasks are completed, all exceptions checks in `*Insert*Suite` will eventually be migrated to `checkError`. ### Why are the changes needed? Migration on checkError() will make the tests independent from the text of error messages. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. Closes #41447 from panbingkun/check_error_for_insert_suites. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../org/apache/spark/sql/SQLInsertTestSuite.scala | 82 ++- ...ltaBasedUpdateAsDeleteAndInsertTableSuite.scala | 11 +- .../org/apache/spark/sql/sources/InsertSuite.scala | 570 ++++++++++++++------- .../org/apache/spark/sql/hive/InsertSuite.scala | 50 +- 4 files changed, 477 insertions(+), 236 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 904980d58d6..af85e44519b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.SparkConf +import org.apache.spark.SparkNumberFormatException import org.apache.spark.sql.catalyst.expressions.Hex import org.apache.spark.sql.connector.catalog.InMemoryPartitionTableCatalog import org.apache.spark.sql.internal.SQLConf @@ -181,16 +182,28 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { } test("insert with column list - mismatched column list size") { - val msgs = Seq("Cannot write to table due to mismatched user specified column size", - "expected 3 columns but found") def test: Unit = { withTable("t1") { val cols = Seq("c1", "c2", "c3") createTable("t1", cols, Seq("int", "long", "string")) - val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)")) - assert(e1.getMessage.contains(msgs(0)) || e1.getMessage.contains(msgs(1))) - val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)")) - assert(e2.getMessage.contains(msgs(0)) || e2.getMessage.contains(msgs(1))) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)") + }, + sqlState = None, + errorClass = "_LEGACY_ERROR_TEMP_1038", + parameters = Map("columnSize" -> "2", "outputSize" -> "3"), + context = ExpectedContext("values(1, 2, 3)", 24, 38) + ) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)") + }, + sqlState = None, + errorClass = "_LEGACY_ERROR_TEMP_1038", + parameters = Map("columnSize" -> "3", "outputSize" -> "2"), + context = ExpectedContext("values(1, 2)", 28, 39) + ) } } withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { @@ -259,10 +272,15 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { "checking duplicate static partition columns should respect case sensitive conf") { withTable("t") { sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)") - val e = intercept[AnalysisException] { - sql("INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)") - } - assert(e.getMessage.contains("Found duplicate keys `c`")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)") + }, + sqlState = None, + errorClass = "DUPLICATE_KEY", + parameters = Map("keyColumn" -> "`c`"), + context = ExpectedContext("PARTITION (c='2', C='3')", 19, 42) + ) } // The following code is skipped for Hive because columns stored in Hive Metastore is always // case insensitive and we cannot create such table in Hive Metastore. @@ -297,11 +315,19 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { withTable("t") { sql("create table t(a int, b string) using parquet partitioned by (a)") if (shouldThrowException(policy)) { - val errorMsg = intercept[NumberFormatException] { - sql("insert into t partition(a='ansi') values('ansi')") - }.getMessage - assert(errorMsg.contains( - """The value 'ansi' of the type "STRING" cannot be cast to "INT"""")) + checkError( + exception = intercept[SparkNumberFormatException] { + sql("insert into t partition(a='ansi') values('ansi')") + }, + errorClass = "CAST_INVALID_INPUT", + parameters = Map( + "expression" -> "'ansi'", + "sourceType" -> "\"STRING\"", + "targetType" -> "\"INT\"", + "ansiConfig" -> "\"spark.sql.ansi.enabled\"" + ), + context = ExpectedContext("insert into t partition(a='ansi')", 0, 32) + ) } else { sql("insert into t partition(a='ansi') values('ansi')") checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil) @@ -340,8 +366,17 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { ).foreach { query => checkAnswer(sql(query), Seq(Row("a", 10, "08"))) } - val e = intercept[AnalysisException](sql("alter table t drop partition(dt='8')")) - assert(e.getMessage.contains("PARTITIONS_NOT_FOUND")) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t drop partition(dt='8')") + }, + errorClass = "PARTITIONS_NOT_FOUND", + sqlState = None, + parameters = Map( + "partitionList" -> "PARTITION \\(`dt` = 8\\)", + "tableName" -> ".*`t`"), + matchPVals = true + ) } } @@ -351,8 +386,17 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { sql("insert into t partition(dt=08) values('a', 10)") checkAnswer(sql("select * from t where dt='08'"), sql("select * from t where dt='07'")) checkAnswer(sql("select * from t where dt=08"), Seq(Row("a", 10, "8"))) - val e = intercept[AnalysisException](sql("alter table t drop partition(dt='08')")) - assert(e.getMessage.contains("PARTITIONS_NOT_FOUND")) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t drop partition(dt='08')") + }, + errorClass = "PARTITIONS_NOT_FOUND", + sqlState = None, + parameters = Map( + "partitionList" -> "PARTITION \\(`dt` = 08\\)", + "tableName" -> ".*.`t`"), + matchPVals = true + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala index c88d6d7e905..51f3d49372c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateAsDeleteAndInsertTableSuite.scala @@ -35,10 +35,13 @@ class DeltaBasedUpdateAsDeleteAndInsertTableSuite extends UpdateTableSuiteBase { |{ "pk": 3, "salary": 120, "dep": 'hr' } |""".stripMargin) - val exception = intercept[AnalysisException] { - sql(s"UPDATE $tableNameAsString SET salary = -1 WHERE pk = 1") - } - assert(exception.message.contains("Row ID attributes cannot be nullable")) + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(s"UPDATE $tableNameAsString SET salary = -1 WHERE pk = 1") + }, + errorClass = "NULLABLE_ROW_ID_ATTRIBUTES", + parameters = Map("nullableRowIdAttrs" -> "pk#\\d+") + ) } test("update with assignments to row ID") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 1df860ef9c4..10f46fb0e47 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -23,12 +23,11 @@ import java.time.{Duration, Period} import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataOutputStream, Path, RawLocalFileSystem} -import org.apache.spark.SparkException +import org.apache.spark.{SparkArithmeticException, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode @@ -448,7 +447,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { val v1 = s""" - | INSERT OVERWRITE DIRECTORY '${path}' + | INSERT OVERWRITE DIRECTORY '$path' | USING json | OPTIONS (a 1, b 0.1, c TRUE) | SELECT 1 as a, 'c' as b @@ -470,7 +469,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { s""" | INSERT OVERWRITE DIRECTORY | USING json - | OPTIONS ('path' '${path}') + | OPTIONS ('path' '$path') | SELECT 1 as a, 'c' as b """.stripMargin @@ -504,16 +503,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { val v1 = s""" - | INSERT OVERWRITE DIRECTORY '${path}' + | INSERT OVERWRITE DIRECTORY '$path' | USING JDBC | OPTIONS (a 1, b 0.1, c TRUE) | SELECT 1 as a, 'c' as b """.stripMargin - val e = intercept[SparkException] { - spark.sql(v1) - }.getMessage - - assert(e.contains("Only Data Sources providing FileFormat are supported")) + checkError( + exception = intercept[SparkException] { + spark.sql(v1) + }, + errorClass = "_LEGACY_ERROR_TEMP_2233", + parameters = Map( + "providingClass" -> ("class org.apache.spark.sql.execution.datasources." + + "jdbc.JdbcRelationProvider")) + ) } } @@ -615,21 +618,33 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) { withTable("t") { sql("create table t(i int, d double) using parquet") - var msg = intercept[AnalysisException] { - sql("insert into t select 1L, 2") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': bigint to int")) - - msg = intercept[AnalysisException] { - sql("insert into t select 1, 2.0") - }.getMessage - assert(msg.contains("Cannot safely cast 'd': decimal(2,1) to double")) - - msg = intercept[AnalysisException] { - sql("insert into t select 1, 2.0D, 3") - }.getMessage - assert(msg.contains( - "Cannot write to '`spark_catalog`.`default`.`t`', too many data columns")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t select 1L, 2") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot safely cast 'i': bigint to int")) + + checkError( + exception = intercept[AnalysisException] { + sql("insert into t select 1, 2.0") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot safely cast 'd': decimal(2,1) to double")) + + checkError( + exception = intercept[AnalysisException] { + sql("insert into t select 1, 2.0D, 3") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "reason" -> "too many data columns", + "tableColumns" -> "'i', 'd'", + "dataColumns" -> "'1', '2.0', '3'")) // Insert into table successfully. sql("insert into t select 1, 2.0D") @@ -644,21 +659,36 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) { withTable("t") { sql("create table t(i int, d double) using parquet") - var msg = intercept[AnalysisException] { - sql("insert into t values('a', 'b')") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': string to int") && - msg.contains("Cannot safely cast 'd': string to double")) - msg = intercept[AnalysisException] { - sql("insert into t values(now(), now())") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': timestamp to int") && - msg.contains("Cannot safely cast 'd': timestamp to double")) - msg = intercept[AnalysisException] { - sql("insert into t values(true, false)") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': boolean to int") && - msg.contains("Cannot safely cast 'd': boolean to double")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values('a', 'b')") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'i': string to int\n- " + + "Cannot safely cast 'd': string to double")) + ) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values(now(), now())") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " + + "Cannot safely cast 'd': timestamp to double")) + ) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values(true, false)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'i': boolean to int\n- " + + "Cannot safely cast 'd': boolean to double")) + ) } } } @@ -695,18 +725,25 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(b int) using parquet") val outOfRangeValue1 = (Int.MaxValue + 1L).toString - val expectedMsg = "Fail to insert a value of \"BIGINT\" type into the \"INT\" type column" + - " `b` due to an overflow." - var msg = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue1)") - }.getMessage - assert(msg.contains(expectedMsg)) - + checkError( + exception = intercept[SparkException] { + sql(s"insert into t values($outOfRangeValue1)") + }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException], + errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"", + "columnName" -> "`b`")) val outOfRangeValue2 = (Int.MinValue - 1L).toString - msg = intercept[SparkException] { - sql(s"insert into t values($outOfRangeValue2)") - }.getMessage - assert(msg.contains(expectedMsg)) + checkError( + exception = intercept[SparkException] { + sql(s"insert into t values($outOfRangeValue2)") + }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException], + errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"BIGINT\"", + "targetType" -> "\"INT\"", + "columnName" -> "`b`")) } } } @@ -717,18 +754,25 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(b long) using parquet") val outOfRangeValue1 = Math.nextUp(Long.MaxValue) - val expectedMsg = "Fail to insert a value of \"DOUBLE\" type into the \"BIGINT\" type " + - "column `b` due to an overflow." - var msg = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue1}D)") - }.getMessage - assert(msg.contains(expectedMsg)) - + checkError( + exception = intercept[SparkException] { + sql(s"insert into t values(${outOfRangeValue1}D)") + }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException], + errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"DOUBLE\"", + "targetType" -> "\"BIGINT\"", + "columnName" -> "`b`")) val outOfRangeValue2 = Math.nextDown(Long.MinValue) - msg = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue2}D)") - }.getMessage - assert(msg.contains(expectedMsg)) + checkError( + exception = intercept[SparkException] { + sql(s"insert into t values(${outOfRangeValue2}D)") + }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException], + errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"DOUBLE\"", + "targetType" -> "\"BIGINT\"", + "columnName" -> "`b`")) } } } @@ -739,12 +783,15 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql("create table t(b decimal(3,2)) using parquet") val outOfRangeValue = "123.45" - val expectedMsg = "Fail to insert a value of \"DECIMAL(5,2)\" type into the " + - "\"DECIMAL(3,2)\" type column `b` due to an overflow." - val msg = intercept[SparkException] { - sql(s"insert into t values(${outOfRangeValue})") - }.getMessage - assert(msg.contains(expectedMsg)) + checkError( + exception = intercept[SparkException] { + sql(s"insert into t values($outOfRangeValue)") + }.getCause.asInstanceOf[SparkException].getCause.asInstanceOf[SparkArithmeticException], + errorClass = "CAST_OVERFLOW_IN_TABLE_INSERT", + parameters = Map( + "sourceType" -> "\"DECIMAL(5,2)\"", + "targetType" -> "\"DECIMAL(3,2)\"", + "columnName" -> "`b`")) } } } @@ -754,38 +801,58 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.ANSI.toString) { withTable("t") { sql("CREATE TABLE t(i int, t timestamp) USING parquet") - val msg = intercept[AnalysisException] { - sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': timestamp to int")) - assert(msg.contains("Cannot safely cast 't': int to timestamp")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " + + "Cannot safely cast 't': int to timestamp")) + ) } withTable("t") { sql("CREATE TABLE t(i int, d date) USING parquet") - val msg = intercept[AnalysisException] { - sql("INSERT INTO t VALUES (date('2010-09-02'), 1)") - }.getMessage - assert(msg.contains("Cannot safely cast 'i': date to int")) - assert(msg.contains("Cannot safely cast 'd': int to date")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO t VALUES (date('2010-09-02'), 1)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'i': date to int\n- " + + "Cannot safely cast 'd': int to date")) + ) } withTable("t") { sql("CREATE TABLE t(b boolean, t timestamp) USING parquet") - val msg = intercept[AnalysisException] { - sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)") - }.getMessage - assert(msg.contains("Cannot safely cast 'b': timestamp to boolean")) - assert(msg.contains("Cannot safely cast 't': boolean to timestamp")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), true)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'b': timestamp to boolean\n- " + + "Cannot safely cast 't': boolean to timestamp")) + ) } withTable("t") { sql("CREATE TABLE t(b boolean, d date) USING parquet") - val msg = intercept[AnalysisException] { - sql("INSERT INTO t VALUES (date('2010-09-02'), true)") - }.getMessage - assert(msg.contains("Cannot safely cast 'b': date to boolean")) - assert(msg.contains("Cannot safely cast 'd': boolean to date")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO t VALUES (date('2010-09-02'), true)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> ("Cannot safely cast 'b': date to boolean\n- " + + "Cannot safely cast 'd': boolean to date")) + ) } } } @@ -866,10 +933,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { ) withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { - val message = intercept[AnalysisException] { - sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") - }.getMessage - assert(message.contains("Cannot write to 'unknown', not enough data columns")) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "unknown", + "reason" -> "not enough data columns", + "tableColumns" -> "'a', 'b'", + "dataColumns" -> "'a'")) } } @@ -1026,15 +1099,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("SPARK-38336 INSERT INTO statements with tables with default columns: negative tests") { - object Errors { - val COMMON_SUBSTRING = " has a DEFAULT value" - val TARGET_TABLE_NOT_FOUND = "The table or view `t` cannot be found" - } // The default value fails to analyze. withTable("t") { - assert(intercept[AnalysisException] { - sql("create table t(i boolean, s bigint default badvalue) using parquet") - }.getMessage.contains(Errors.COMMON_SUBSTRING)) + checkError( + exception = intercept[AnalysisException] { + sql("create table t(i boolean, s bigint default badvalue) using parquet") + }, + errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION", + parameters = Map( + "statement" -> "CREATE TABLE", + "colName" -> "`s`", + "defaultValue" -> "badvalue")) } // The default value analyzes to a table not in the catalog. withTable("t") { @@ -1079,50 +1154,78 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // Explicit default values may not participate in complex expressions in the VALUES list. withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t values(false, default + 1)") - }.getMessage.contains( - QueryCompilationErrors.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList() - .getMessage)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values(false, default + 1)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1339", + parameters = Map.empty + ) } // Explicit default values may not participate in complex expressions in the SELECT query. withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t select false, default + 1") - }.getMessage.contains( - QueryCompilationErrors.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList() - .getMessage)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t select false, default + 1") + }, + errorClass = "_LEGACY_ERROR_TEMP_1339", + parameters = Map.empty + ) } // Explicit default values have a reasonable error path if the table is not found. withTable("t") { - assert(intercept[AnalysisException] { - sql("insert into t values(false, default)") - }.getMessage.contains(Errors.TARGET_TABLE_NOT_FOUND)) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values(false, default)") + }, + errorClass = "TABLE_OR_VIEW_NOT_FOUND", + parameters = Map("relationName" -> "`t`"), + context = ExpectedContext("t", 12, 12) + ) } // The default value parses but the type is not coercible. withTable("t") { - assert(intercept[AnalysisException] { - sql("create table t(i boolean, s bigint default false) using parquet") - }.getMessage.contains(Errors.COMMON_SUBSTRING)) + checkError( + exception = intercept[AnalysisException] { + sql("create table t(i boolean, s bigint default false) using parquet") + }, + errorClass = "INVALID_DEFAULT_VALUE.DATA_TYPE", + parameters = Map( + "statement" -> "CREATE TABLE", + "colName" -> "`s`", + "expectedType" -> "\"BIGINT\"", + "defaultValue" -> "false", + "actualType" -> "\"BOOLEAN\"")) } // The number of columns in the INSERT INTO statement is greater than the number of columns in // the table. withTable("t") { sql("create table num_data(id int, val decimal(38,10)) using parquet") sql("create table t(id1 int, int2 int, result decimal(38,10)) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " + - "from num_data t1, num_data t2") - }.getMessage.contains( - "Cannot write to '`spark_catalog`.`default`.`t`', too many data columns")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " + + "from num_data t1, num_data t2") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "reason" -> "too many data columns", + "tableColumns" -> "'id1', 'int2', 'result'", + "dataColumns" -> "'id', 'id', 'val', 'val', '(val * val)'")) } // The default value is disabled per configuration. withTable("t") { withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { - assert(intercept[AnalysisException] { - sql("create table t(i boolean, s bigint default 42L) using parquet") - }.getMessage.contains("Support for DEFAULT column values is not allowed")) + checkError( + exception = intercept[AnalysisException] { + sql("create table t(i boolean, s bigint default 42L) using parquet") + }, + errorClass = "_LEGACY_ERROR_TEMP_0058", + parameters = Map.empty, + context = ExpectedContext("s bigint default 42L", 26, 45) + ) } } // The table has a partitioning column with a default value; this is not allowed. @@ -1145,10 +1248,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { withTable("t") { sql("create table t(i boolean, s bigint) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t values(true)") - }.getMessage.contains( - "Cannot write to '`spark_catalog`.`default`.`t`', not enough data columns")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t values(true)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "reason" -> "not enough data columns", + "tableColumns" -> "'i', 's'", + "dataColumns" -> "'col1'")) } } } @@ -1211,14 +1320,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("SPARK- 38795 INSERT INTO with user specified columns and defaults: negative tests") { - val missingColError = "Cannot find data for output column " // The missing columns in these INSERT INTO commands do not have explicit default values. withTable("t") { sql("create table t(i boolean, s bigint, q int default 43) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (i, q) select true from (select 1)") - }.getMessage.contains("Cannot write to table due to mismatched user specified column " + - "size(2) and data column size(1)")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t (i, q) select true from (select 1)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1038", + parameters = Map("columnSize" -> "2", "outputSize" -> "1"), + ExpectedContext( + fragment = "select true from (select 1)", start = 21, stop = 47)) } // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no // explicit DEFAULT value is available when the INSERT INTO statement provides fewer @@ -1226,39 +1338,69 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") { withTable("t") { sql("create table t(i boolean, s bigint) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (i) values (true)") - }.getMessage.contains(missingColError + "'s'")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t (i) values (true)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot find data for output column 's'")) } withTable("t") { sql("create table t(i boolean default true, s bigint) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (i) values (default)") - }.getMessage.contains(missingColError + "'s'")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t (i) values (default)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot find data for output column 's'")) } withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (s) values (default)") - }.getMessage.contains(missingColError + "'i'")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t (s) values (default)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot find data for output column 'i'")) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") - assert(intercept[AnalysisException] { - sql("insert into t partition(i='true') (s) values(5)") - }.getMessage.contains(missingColError + "'q'")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(i='true') (s) values(5)") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot find data for output column 'q'")) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") - assert(intercept[AnalysisException] { - sql("insert into t partition(i='false') (q) select 43") - }.getMessage.contains(missingColError + "'s'")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(i='false') (q) select 43") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot find data for output column 's'")) } withTable("t") { sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)") - assert(intercept[AnalysisException] { - sql("insert into t partition(i='false') (q) select default") - }.getMessage.contains(missingColError + "'s'")) + checkError( + exception = intercept[AnalysisException] { + sql("insert into t partition(i='false') (q) select default") + }, + errorClass = "_LEGACY_ERROR_TEMP_1204", + parameters = Map( + "tableName" -> "`spark_catalog`.`default`.`t`", + "errors" -> "Cannot find data for output column 's'")) } } // When the CASE_SENSITIVE configuration is enabled, then using different cases for the required @@ -1394,15 +1536,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Negative tests") { - object Errors { - val COMMON_SUBSTRING = " has a DEFAULT value" - } // The default value fails to analyze. withTable("t") { sql("create table t(i boolean) using parquet") - assert(intercept[AnalysisException] { - sql("alter table t add column s bigint default badvalue") - }.getMessage.contains(Errors.COMMON_SUBSTRING)) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t add column s bigint default badvalue") + }, + errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION", + parameters = Map( + "statement" -> "ALTER TABLE ADD COLUMNS", + "colName" -> "`s`", + "defaultValue" -> "badvalue")) } // The default value analyzes to a table not in the catalog. withTable("t") { @@ -1450,9 +1595,17 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { sql("create table t(i boolean) using parquet") - assert(intercept[AnalysisException] { - sql("alter table t add column s bigint default 42L") - }.getMessage.contains("Support for DEFAULT column values is not allowed")) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t add column s bigint default 42L") + }, + errorClass = "_LEGACY_ERROR_TEMP_0058", + parameters = Map.empty, + context = ExpectedContext( + fragment = "s bigint default 42L", + start = 25, + stop = 44) + ) } } } @@ -1482,16 +1635,19 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } test("SPARK-38838 INSERT INTO with defaults set by ALTER TABLE ALTER COLUMN: negative tests") { - object Errors { - val COMMON_SUBSTRING = " has a DEFAULT value" - } val createTable = "create table t(i boolean, s bigint) using parquet" withTable("t") { sql(createTable) // The default value fails to analyze. - assert(intercept[AnalysisException] { - sql("alter table t alter column s set default badvalue") - }.getMessage.contains(Errors.COMMON_SUBSTRING)) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t alter column s set default badvalue") + }, + errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION", + parameters = Map( + "statement" -> "ALTER TABLE ALTER COLUMN", + "colName" -> "`s`", + "defaultValue" -> "badvalue")) // The default value analyzes to a table not in the catalog. checkError( exception = intercept[AnalysisException] { @@ -1543,9 +1699,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // Attempting to set a default value for a partitioning column is not allowed. withTable("t") { sql("create table t(i boolean, s bigint, q int default 42) using parquet partitioned by (i)") - assert(intercept[AnalysisException] { - sql("alter table t alter column i set default false") - }.getMessage.contains("Can't find column `i` given table data columns [`s`, `q`]")) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t alter column i set default false") + }, + errorClass = "_LEGACY_ERROR_TEMP_1246", + parameters = Map("name" -> "i", "fieldNames" -> "[`s`, `q`]")) } } @@ -1703,14 +1862,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // "default" generates a "column not found" error before any following .insertInto. withTable("t") { sql(s"create table t(a string, i int default 42) using parquet") - assert(intercept[AnalysisException] { - Seq(("xyz")).toDF.select("value", "default").write.insertInto("t") - }.getMessage.contains("column or function parameter with name `default` cannot be resolved")) + checkError( + exception = intercept[AnalysisException] { + Seq("xyz").toDF.select("value", "default").write.insertInto("t") + }, + errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION", + parameters = Map("objectName" -> "`default`", "proposal" -> "`value`")) } } test("SPARK-40001 JSON DEFAULT columns = JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE off") { - val error = "DEFAULT values are not supported for JSON tables" // Check that the JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE config overrides the // JSON_GENERATOR_IGNORE_NULL_FIELDS config. withSQLConf(SQLConf.JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE.key -> "true", @@ -1733,15 +1894,22 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { test("SPARK-39359 Restrict DEFAULT columns to allowlist of supported data source types") { withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "csv,json,orc") { - val unsupported = "DEFAULT values are not supported for target data source" - assert(intercept[AnalysisException] { - sql(s"create table t(a string default 'abc') using parquet") - }.getMessage.contains(unsupported)) + checkError( + exception = intercept[AnalysisException] { + sql(s"create table t(a string default 'abc') using parquet") + }, + errorClass = "_LEGACY_ERROR_TEMP_1345", + parameters = Map("statementType" -> "CREATE TABLE", "dataSource" -> "parquet")) withTable("t") { sql(s"create table t(a string, b int) using parquet") - assert(intercept[AnalysisException] { - sql("alter table t add column s bigint default 42") - }.getMessage.contains(unsupported)) + checkError( + exception = intercept[AnalysisException] { + sql("alter table t add column s bigint default 42") + }, + errorClass = "_LEGACY_ERROR_TEMP_1345", + parameters = Map( + "statementType" -> "ALTER TABLE ADD COLUMNS", + "dataSource" -> "parquet")) } } } @@ -1765,7 +1933,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -2022,13 +2190,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // Now update the allowlist of table providers to prohibit ALTER TABLE ADD COLUMN commands // from assigning DEFAULT values. withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$provider*") { - assert(intercept[AnalysisException] { - // Try to add another column to the existing table again. This fails because the table - // provider is now in the denylist. - sql(s"alter table t1 add column (b string default 'abc')") - }.getMessage.contains( - QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed( - "ALTER TABLE ADD COLUMNS", provider).getMessage)) + checkError( + exception = intercept[AnalysisException] { + // Try to add another column to the existing table again. This fails because the table + // provider is now in the denylist. + sql(s"alter table t1 add column (b string default 'abc')") + }, + errorClass = "_LEGACY_ERROR_TEMP_1346", + parameters = Map( + "statementType" -> "ALTER TABLE ADD COLUMNS", + "dataSource" -> provider)) withTable("t2") { // It is still OK to create a new table with a column DEFAULT value assigned, even if // the table provider is in the above denylist. @@ -2146,13 +2317,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { |CREATE TABLE insertTable(i int, part1 string, part2 string) USING PARQUET |PARTITIONED BY (part1, part2) """.stripMargin) - val msg = "Partition spec is invalid" - assert(intercept[AnalysisException] { - sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") - }.getMessage.contains(msg)) - assert(intercept[AnalysisException] { - sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2") - }.getMessage.contains(msg)) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1=1, part2='') SELECT 1") + }, + errorClass = "_LEGACY_ERROR_TEMP_1076", + parameters = Map( + "details" -> ("The spec ([part1=Some(1), part2=Some()]) " + + "contains an empty partition column value")) + ) + checkError( + exception = intercept[AnalysisException] { + sql("INSERT INTO TABLE insertTable PARTITION(part1='', part2) SELECT 1 ,'' AS part2") + }, + errorClass = "_LEGACY_ERROR_TEMP_1076", + parameters = Map( + "details" -> ("The spec ([part1=Some(), part2=None]) " + + "contains an empty partition column value")) + ) sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2='2') SELECT 1") sql("INSERT INTO TABLE insertTable PARTITION(part1='1', part2) SELECT 1 ,'2' AS part2") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index 9cc26d894ba..03bbcb1a5c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -346,20 +346,30 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName => - val cause = intercept[AnalysisException] { - Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName) - } - - assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy().")) + checkError( + exception = intercept[AnalysisException] { + Seq((1, 2, 3, 4)).toDF("a", "b", "c", "d").write.partitionBy("b", "c").insertInto(tableName) + }, + errorClass = "_LEGACY_ERROR_TEMP_1309", + parameters = Map.empty + ) } testPartitionedTable( "SPARK-16036: better error message when insert into a table with mismatch schema") { tableName => - val e = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3") - } - assert(e.message.contains("Cannot write to") && e.message.contains("too many data columns")) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $tableName PARTITION(b=1, c=2) SELECT 1, 2, 3") + }, + errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH", + parameters = Map( + "staticPartCols" -> "'b', 'c'", + "tableColumns" -> "'a', 'd', 'b', 'c'", + "reason" -> "too many data columns", + "dataColumns" -> "'1', '2', '3'", + "tableName" -> s"`spark_catalog`.`default`.`$tableName`") + ) } testPartitionedTable("SPARK-16037: INSERT statement should match columns by position") { @@ -841,16 +851,18 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter |PARTITIONED BY (d string) """.stripMargin) - val e = intercept[AnalysisException] { - spark.sql( - """ - |INSERT OVERWRITE TABLE t1 PARTITION(d='') - |SELECT 1 - """.stripMargin) - }.getMessage - - assert(!e.contains("get partition: Value for key d is null or empty")) - assert(e.contains("Partition spec is invalid")) + checkError( + exception = intercept[AnalysisException] { + spark.sql( + """ + |INSERT OVERWRITE TABLE t1 PARTITION(d='') + |SELECT 1 + """.stripMargin) + }, + errorClass = "_LEGACY_ERROR_TEMP_1076", + parameters = Map( + "details" -> "The spec ([d=Some()]) contains an empty partition column value") + ) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org