This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4dedb4ad2c9 [SPARK-44384][SQL][TESTS] Use checkError() to check Exception in *View*Suite, *Namespace*Suite, *DataSource*Suite 4dedb4ad2c9 is described below commit 4dedb4ad2c9b2ecd75dd9ccec5f565805752ad8e Author: panbingkun <pbk1...@gmail.com> AuthorDate: Thu Jul 13 16:26:34 2023 +0300 [SPARK-44384][SQL][TESTS] Use checkError() to check Exception in *View*Suite, *Namespace*Suite, *DataSource*Suite ### What changes were proposed in this pull request? The pr aims to use `checkError()` to check `Exception` in `*View*Suite`, `*Namespace*Suite`, `*DataSource*Suite`, include: - sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite - sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite - sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite - sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite - sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite - sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite - sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite - sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite - sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite - sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite - sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite - sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite - sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite ### Why are the changes needed? Migration on checkError() will make the tests independent from the text of error messages. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Manually test. - Pass GA. Closes #41952 from panbingkun/view_and_namespace_checkerror. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- .../spark/sql/FileBasedDataSourceSuite.scala | 67 +++-- .../apache/spark/sql/NestedDataSourceSuite.scala | 24 +- .../sql/connector/DataSourceV2DataFrameSuite.scala | 21 +- .../sql/connector/DataSourceV2FunctionSuite.scala | 189 +++++++++++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 54 +++- .../spark/sql/connector/DataSourceV2Suite.scala | 38 ++- .../apache/spark/sql/execution/SQLViewSuite.scala | 313 ++++++++++++++------- .../execution/command/v2/ShowNamespacesSuite.scala | 28 +- .../sql/sources/ResolvedDataSourceSuite.scala | 24 +- .../sources/StreamingDataSourceV2Suite.scala | 68 +++-- .../spark/sql/hive/MetastoreDataSourcesSuite.scala | 88 +++--- .../sql/hive/execution/HiveSQLViewSuite.scala | 31 +- .../command/AlterNamespaceSetLocationSuite.scala | 11 +- 13 files changed, 671 insertions(+), 285 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index e7e53285d62..d69a68f5726 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -26,7 +26,7 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{LocalFileSystem, Path} -import org.apache.spark.SparkException +import org.apache.spark.{SparkException, SparkFileNotFoundException, SparkRuntimeException} import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} @@ -129,11 +129,13 @@ class FileBasedDataSourceSuite extends QueryTest allFileBasedDataSources.foreach { format => test(s"SPARK-23372 error while writing empty schema files using $format") { withTempPath { outputPath => - val errMsg = intercept[AnalysisException] { - spark.emptyDataFrame.write.format(format).save(outputPath.toString) - } - assert(errMsg.getMessage.contains( - "Datasource does not support writing empty or nested empty schemas")) + checkError( + exception = intercept[AnalysisException] { + spark.emptyDataFrame.write.format(format).save(outputPath.toString) + }, + errorClass = "_LEGACY_ERROR_TEMP_1142", + parameters = Map.empty + ) } // Nested empty schema @@ -144,11 +146,13 @@ class FileBasedDataSourceSuite extends QueryTest StructField("c", IntegerType) )) val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema) - val errMsg = intercept[AnalysisException] { - df.write.format(format).save(outputPath.toString) - } - assert(errMsg.getMessage.contains( - "Datasource does not support writing empty or nested empty schemas")) + checkError( + exception = intercept[AnalysisException] { + df.write.format(format).save(outputPath.toString) + }, + errorClass = "_LEGACY_ERROR_TEMP_1142", + parameters = Map.empty + ) } } } @@ -242,10 +246,17 @@ class FileBasedDataSourceSuite extends QueryTest if (ignore.toBoolean) { testIgnoreMissingFiles(options) } else { - val exception = intercept[SparkException] { - testIgnoreMissingFiles(options) + val errorClass = sources match { + case "" => "_LEGACY_ERROR_TEMP_2062" + case _ => "_LEGACY_ERROR_TEMP_2055" } - assert(exception.getMessage().contains("does not exist")) + checkErrorMatchPVals( + exception = intercept[SparkException] { + testIgnoreMissingFiles(options) + }.getCause.asInstanceOf[SparkFileNotFoundException], + errorClass = errorClass, + parameters = Map("message" -> ".*does not exist") + ) } } } @@ -646,20 +657,20 @@ class FileBasedDataSourceSuite extends QueryTest // RuntimeException is triggered at executor side, which is then wrapped as // SparkException at driver side - val e1 = intercept[SparkException] { - sql(s"select b from $tableName").collect() - } - assert( - e1.getCause.isInstanceOf[RuntimeException] && - e1.getCause.getMessage.contains( - """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) - val e2 = intercept[SparkException] { - sql(s"select B from $tableName").collect() - } - assert( - e2.getCause.isInstanceOf[RuntimeException] && - e2.getCause.getMessage.contains( - """Found duplicate field(s) "b": [b, B] in case-insensitive mode""")) + checkError( + exception = intercept[SparkException] { + sql(s"select b from $tableName").collect() + }.getCause.asInstanceOf[SparkRuntimeException], + errorClass = "_LEGACY_ERROR_TEMP_2093", + parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") + ) + checkError( + exception = intercept[SparkException] { + sql(s"select B from $tableName").collect() + }.getCause.asInstanceOf[SparkRuntimeException], + errorClass = "_LEGACY_ERROR_TEMP_2093", + parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]") + ) } withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala index ded2a80c6fa..f83e7b6727b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala @@ -55,17 +55,19 @@ trait NestedDataSourceSuiteBase extends QueryTest with SharedSparkSession { withTempPath { dir => val path = dir.getCanonicalPath save(selectExpr, format, path) - val e = intercept[AnalysisException] { - spark - .read - .options(readOptions(caseInsensitiveSchema)) - .schema(caseInsensitiveSchema) - .format(format) - .load(path) - .collect() - } - assert(e.getErrorClass == "COLUMN_ALREADY_EXISTS") - assert(e.getMessageParameters().get("columnName") == "`camelcase`") + checkError( + exception = intercept[AnalysisException] { + spark + .read + .options(readOptions(caseInsensitiveSchema)) + .schema(caseInsensitiveSchema) + .format(format) + .load(path) + .collect() + }, + errorClass = "COLUMN_ALREADY_EXISTS", + parameters = Map("columnName" -> "`camelcase`") + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 3678f29ab49..b8e0d50dc9c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -182,12 +182,21 @@ class DataSourceV2DataFrameSuite Array.empty[Transform], Collections.emptyMap[String, String]) val df = sql(s"select interval 1 millisecond as i") val v2Writer = df.writeTo("testcat.table_name") - val e1 = intercept[AnalysisException](v2Writer.append()) - assert(e1.getMessage.contains(s"Cannot use interval type in the table schema.")) - val e2 = intercept[AnalysisException](v2Writer.overwrite(df("i"))) - assert(e2.getMessage.contains(s"Cannot use interval type in the table schema.")) - val e3 = intercept[AnalysisException](v2Writer.overwritePartitions()) - assert(e3.getMessage.contains(s"Cannot use interval type in the table schema.")) + checkError( + exception = intercept[AnalysisException](v2Writer.append()), + errorClass = "_LEGACY_ERROR_TEMP_1183", + parameters = Map.empty + ) + checkError( + exception = intercept[AnalysisException](v2Writer.overwrite(df("i"))), + errorClass = "_LEGACY_ERROR_TEMP_1183", + parameters = Map.empty + ) + checkError( + exception = intercept[AnalysisException](v2Writer.overwritePartitions()), + errorClass = "_LEGACY_ERROR_TEMP_1183", + parameters = Map.empty + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala index eea2eebf849..32391eac9a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala @@ -157,24 +157,39 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { test("non-function catalog") { withSQLConf("spark.sql.catalog.testcat" -> classOf[BasicInMemoryTableCatalog].getName) { - assert(intercept[AnalysisException]( - sql("SELECT testcat.strlen('abc')").collect() - ).getMessage.contains("Catalog testcat does not support functions")) + checkError( + exception = intercept[AnalysisException]( + sql("SELECT testcat.strlen('abc')").collect() + ), + errorClass = "_LEGACY_ERROR_TEMP_1184", + parameters = Map("plugin" -> "testcat", "ability" -> "functions") + ) } } test("DESCRIBE FUNCTION: only support session catalog") { addFunction(Identifier.of(Array.empty, "abc"), new JavaStrLen(new JavaStrLenNoImpl)) - val e = intercept[AnalysisException] { - sql("DESCRIBE FUNCTION testcat.abc") - } - assert(e.message.contains("Catalog testcat does not support functions")) + checkError( + exception = intercept[AnalysisException] { + sql("DESCRIBE FUNCTION testcat.abc") + }, + errorClass = "_LEGACY_ERROR_TEMP_1184", + parameters = Map( + "plugin" -> "testcat", + "ability" -> "functions" + ) + ) - val e1 = intercept[AnalysisException] { - sql("DESCRIBE FUNCTION default.ns1.ns2.fun") - } - assert(e1.message.contains("requires a single-part namespace")) + checkError( + exception = intercept[AnalysisException] { + sql("DESCRIBE FUNCTION default.ns1.ns2.fun") + }, + errorClass = "REQUIRES_SINGLE_PART_NAMESPACE", + parameters = Map( + "sessionCatalog" -> "spark_catalog", + "namespace" -> "`default`.`ns1`.`ns2`") + ) } test("DROP FUNCTION: only support session catalog") { @@ -307,8 +322,14 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { test("scalar function: bad magic method") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadMagic)) - assert(intercept[SparkException](sql("SELECT testcat.ns.strlen('abc')").collect()) - .getMessage.contains("Cannot find a compatible")) + // TODO assign a error-classes name + checkError( + exception = intercept[SparkException] { + sql("SELECT testcat.ns.strlen('abc')").collect() + }, + errorClass = null, + parameters = Map.empty + ) } test("scalar function: bad magic method with default impl") { @@ -327,10 +348,35 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenDefault)) - assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen(42)")) - .getMessage.contains("Expect StringType")) - assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen('a', 'b')")) - .getMessage.contains("Expect exactly one argument")) + checkError( + exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen(42)")), + errorClass = "_LEGACY_ERROR_TEMP_1198", + parameters = Map( + "unbound" -> "strlen", + "arguments" -> "int", + "unsupported" -> "Expect StringType" + ), + context = ExpectedContext( + fragment = "testcat.ns.strlen(42)", + start = 7, + stop = 27 + ) + ) + + checkError( + exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('a', 'b')")), + errorClass = "_LEGACY_ERROR_TEMP_1198", + parameters = Map( + "unbound" -> "strlen", + "arguments" -> "string, string", + "unsupported" -> "Expect exactly one argument" + ), + context = ExpectedContext( + fragment = "testcat.ns.strlen('a', 'b')", + start = 7, + stop = 33 + ) + ) } test("scalar function: default produceResult in Java") { @@ -373,22 +419,52 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), new JavaStrLen(new JavaStrLenNoImpl)) - assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()) - .getMessage.contains("neither implement magic method nor override 'produceResult'")) + // TODO assign a error-classes name + checkError( + exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()), + errorClass = null, + parameters = Map.empty, + context = ExpectedContext( + fragment = "testcat.ns.strlen('abc')", + start = 7, + stop = 30 + ) + ) } test("SPARK-35390: scalar function w/ bad input types") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadInputTypes)) - assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()) - .getMessage.contains("parameters returned from 'inputTypes()'")) + checkError( + exception = intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')").collect()), + errorClass = "_LEGACY_ERROR_TEMP_1199", + parameters = Map( + "bound" -> "strlen_bad_input_types", + "argsLen" -> "1", + "inputTypesLen" -> "2" + ), + context = ExpectedContext( + fragment = "testcat.ns.strlen('abc')", + start = 7, + stop = 30 + ) + ) } test("SPARK-35390: scalar function w/ mismatch type parameters from magic method") { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new JavaLongAddMismatchMagic)) - assert(intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 2L)").collect()) - .getMessage.contains("neither implement magic method nor override 'produceResult'")) + // TODO assign a error-classes name + checkError( + exception = intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 2L)").collect()), + errorClass = null, + parameters = Map.empty, + context = ExpectedContext( + fragment = "testcat.ns.add(1L, 2L)", + start = 7, + stop = 28 + ) + ) } test("SPARK-35390: scalar function w/ type coercion") { @@ -402,10 +478,33 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { checkAnswer(sql(s"SELECT testcat.ns.$name(42L, 58)"), Row(100) :: Nil) checkAnswer(sql(s"SELECT testcat.ns.$name(42, 58L)"), Row(100) :: Nil) + val paramIndex = name match { + case "add" => "1" + case "add2" => "2" + case "add3" => "1" + } + // can't cast date time interval to long - assert(intercept[AnalysisException]( - sql(s"SELECT testcat.ns.$name(date '2021-06-01' - date '2011-06-01', 93)").collect()) - .getMessage.contains("due to data type mismatch")) + val sqlText = s"SELECT testcat.ns.$name(date '2021-06-01' - date '2011-06-01', 93)" + checkErrorMatchPVals( + exception = intercept[AnalysisException] { + sql(sqlText).collect() + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + sqlState = None, + parameters = Map( + "sqlExpr" -> ".*", + "paramIndex" -> paramIndex, + "inputSql" -> "\"\\(DATE '2021-06-01' - DATE '2011-06-01'\\)\"", + "inputType" -> "\"INTERVAL DAY\"", + "requiredType" -> "\"BIGINT\"" + ), + context = ExpectedContext( + fragment = s"testcat.ns.$name(date '2021-06-01' - date '2011-06-01', 93)", + start = 7, + stop = sqlText.length - 1 + ) + ) } } @@ -510,8 +609,20 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { addFunction(Identifier.of(Array("ns"), "avg"), IntegralAverage) Seq(1.toShort, 2.toShort).toDF("i").write.saveAsTable(t) - assert(intercept[AnalysisException](sql(s"SELECT testcat.ns.avg(i) from $t")) - .getMessage.contains("Unsupported non-integral type: ShortType")) + checkError( + exception = intercept[AnalysisException](sql(s"SELECT testcat.ns.avg(i) from $t")), + errorClass = "_LEGACY_ERROR_TEMP_1198", + parameters = Map( + "unbound" -> "iavg", + "arguments" -> "smallint", + "unsupported" -> "Unsupported non-integral type: ShortType" + ), + context = ExpectedContext( + fragment = "testcat.ns.avg(i)", + start = 7, + stop = 23 + ) + ) } } @@ -530,9 +641,25 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { Row(BigDecimal(50.5)) :: Nil) // can't cast interval to decimal - assert(intercept[AnalysisException](sql("SELECT testcat.ns.avg(*) from values" + - " (date '2021-06-01' - date '2011-06-01'), (date '2000-01-01' - date '1900-01-01')")) - .getMessage.contains("due to data type mismatch")) + checkError( + exception = intercept[AnalysisException] { + sql("SELECT testcat.ns.avg(*) from values " + + "(date '2021-06-01' - date '2011-06-01'), (date '2000-01-01' - date '1900-01-01')") + }, + errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + parameters = Map( + "sqlExpr" -> "\"v2aggregator(col1)\"", + "paramIndex" -> "1", + "inputSql" -> "\"col1\"", + "inputType" -> "\"INTERVAL DAY\"", + "requiredType" -> "\"DECIMAL(38,18)\"" + ), + context = ExpectedContext( + fragment = "testcat.ns.avg(*)", + start = 7, + stop = 23 + ) + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 13277cfdd8d..06f5600e0d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -815,10 +815,14 @@ class DataSourceV2SQLSuiteV1Filter if (nullable) { insertNullValueAndCheck() } else { - val e = intercept[Exception] { - insertNullValueAndCheck() - } - assert(e.getMessage.contains("Null value appeared in non-nullable field")) + // TODO assign a error-classes name + checkError( + exception = intercept[SparkException] { + insertNullValueAndCheck() + }, + errorClass = null, + parameters = Map.empty + ) } } } @@ -1109,9 +1113,17 @@ class DataSourceV2SQLSuiteV1Filter sql(s"INSERT INTO $t1(data, id) VALUES('c', 3)") verifyTable(t1, df) // Missing columns - assert(intercept[AnalysisException] { - sql(s"INSERT INTO $t1 VALUES(4)") - }.getMessage.contains("not enough data columns")) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO $t1 VALUES(4)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`default`.`tbl`", + "tableColumns" -> "`id`, `data`", + "dataColumns" -> "`col1`" + ) + ) // Duplicate columns checkError( exception = intercept[AnalysisException] { @@ -1136,9 +1148,17 @@ class DataSourceV2SQLSuiteV1Filter sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 3)") verifyTable(t1, Seq((3L, "c")).toDF("id", "data")) // Missing columns - assert(intercept[AnalysisException] { - sql(s"INSERT OVERWRITE $t1 VALUES(4)") - }.getMessage.contains("not enough data columns")) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE $t1 VALUES(4)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`default`.`tbl`", + "tableColumns" -> "`id`, `data`", + "dataColumns" -> "`col1`" + ) + ) // Duplicate columns checkError( exception = intercept[AnalysisException] { @@ -1164,9 +1184,17 @@ class DataSourceV2SQLSuiteV1Filter sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('c', 'e', 1)") verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", "data2")) // Missing columns - assert(intercept[AnalysisException] { - sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)") - }.getMessage.contains("not enough data columns")) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)") + }, + errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS", + parameters = Map( + "tableName" -> "`default`.`tbl`", + "tableColumns" -> "`id`, `data`, `data2`", + "dataColumns" -> "`col1`, `col2`" + ) + ) // Duplicate columns checkError( exception = intercept[AnalysisException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala index 02990a7a40d..236b4c702d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala @@ -421,19 +421,31 @@ class DataSourceV2Suite extends QueryTest with SharedSparkSession with AdaptiveS spark.read.format(cls.getName).option("path", path).load(), spark.range(5).select($"id", -$"id")) - val e = intercept[AnalysisException] { - spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j")) - .write.format(cls.getName) - .option("path", path).mode("ignore").save() - } - assert(e.message.contains("please use Append or Overwrite modes instead")) - - val e2 = intercept[AnalysisException] { - spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j")) - .write.format(cls.getName) - .option("path", path).mode("error").save() - } - assert(e2.getMessage.contains("please use Append or Overwrite modes instead")) + checkError( + exception = intercept[AnalysisException] { + spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j")) + .write.format(cls.getName) + .option("path", path).mode("ignore").save() + }, + errorClass = "_LEGACY_ERROR_TEMP_1308", + parameters = Map( + "source" -> cls.getName, + "createMode" -> "Ignore" + ) + ) + + checkError( + exception = intercept[AnalysisException] { + spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j")) + .write.format(cls.getName) + .option("path", path).mode("error").save() + }, + errorClass = "_LEGACY_ERROR_TEMP_1308", + parameters = Map( + "source" -> cls.getName, + "createMode" -> "ErrorIfExists" + ) + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index fe4723eaf28..e258d600a2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.SparkException +import org.apache.spark.{SparkArithmeticException, SparkException, SparkFileNotFoundException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide} @@ -111,18 +111,35 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { test("error handling: existing a table with the duplicate name when creating/altering a view") { withTable("tab1") { sql("CREATE TABLE tab1 (id int) USING parquet") - var e = intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") - }.getMessage - assert(e.contains("`tab1` is not a view")) - e = intercept[AnalysisException] { - sql("CREATE VIEW tab1 AS SELECT * FROM jt") - }.getMessage - assert(e.contains("`tab1` is not a view")) - e = intercept[AnalysisException] { - sql("ALTER VIEW tab1 AS SELECT * FROM jt") - }.getMessage - assert(e.contains("tab1 is a table. 'ALTER VIEW ... AS' expects a view.")) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") + }, + errorClass = "_LEGACY_ERROR_TEMP_1278", + parameters = Map("name" -> s"`$SESSION_CATALOG_NAME`.`default`.`tab1`") + ) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE VIEW tab1 AS SELECT * FROM jt") + }, + errorClass = "_LEGACY_ERROR_TEMP_1278", + parameters = Map("name" -> s"`$SESSION_CATALOG_NAME`.`default`.`tab1`") + ) + checkError( + exception = intercept[AnalysisException] { + sql("ALTER VIEW tab1 AS SELECT * FROM jt") + }, + errorClass = "_LEGACY_ERROR_TEMP_1015", + parameters = Map( + "identifier" -> "default.tab1", + "cmd" -> "ALTER VIEW ... AS", + "hintStr" -> "" + ), + context = ExpectedContext( + fragment = "tab1", + start = 11, + stop = 14) + ) } } @@ -213,26 +230,60 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat") - val e2 = intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") - }.getMessage - assert(e2.contains(s"$viewName is a temp view. 'LOAD DATA' expects a table")) - val e3 = intercept[AnalysisException] { - sql(s"SHOW CREATE TABLE $viewName") - }.getMessage - assert(e3.contains( - s"$viewName is a temp view. 'SHOW CREATE TABLE' expects a table or permanent view.")) - val e4 = intercept[AnalysisException] { - sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") - }.getMessage - assert(e4.contains( - s"$viewName is a temp view. 'ANALYZE TABLE' expects a table or permanent view.")) + val sqlText = s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1013", + parameters = Map( + "nameParts" -> viewName, + "viewStr" -> "temp view", + "cmd" -> "LOAD DATA", + "hintStr" -> "" + ), + context = ExpectedContext( + fragment = viewName, + start = sqlText.length - 8, + stop = sqlText.length - 1 + ) + ) + checkError( + exception = intercept[AnalysisException] { + sql(s"SHOW CREATE TABLE $viewName") + }, + errorClass = "_LEGACY_ERROR_TEMP_1016", + parameters = Map( + "nameParts" -> "testView", + "cmd" -> "SHOW CREATE TABLE" + ), + context = ExpectedContext( + fragment = viewName, + start = 18, + stop = 25 + ) + ) + checkError( + exception = intercept[AnalysisException] { + sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + }, + errorClass = "_LEGACY_ERROR_TEMP_1016", + parameters = Map( + "nameParts" -> "testView", + "cmd" -> "ANALYZE TABLE" + ), + context = ExpectedContext( + fragment = viewName, + start = 14, + stop = 21 + ) + ) checkError( exception = intercept[AnalysisException] { sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") }, errorClass = "UNSUPPORTED_FEATURE.ANALYZE_UNCACHED_TEMP_VIEW", - parameters = Map("viewName" -> "`testView`") + parameters = Map("viewName" -> s"`$viewName`") ) } } @@ -243,16 +294,9 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } private def assertAnalysisErrorClass(query: String, - errorClass: String, - parameters: Map[String, String]): Unit = { - val e = intercept[AnalysisException](sql(query)) - checkError(e, errorClass = errorClass, parameters = parameters) - } - - private def assertAnalysisErrorClass(query: String, - errorClass: String, - parameters: Map[String, String], - context: ExpectedContext): Unit = { + errorClass: String, + parameters: Map[String, String], + context: ExpectedContext): Unit = { val e = intercept[AnalysisException](sql(query)) checkError(e, errorClass = errorClass, parameters = parameters, context = context) } @@ -268,18 +312,34 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { val viewName = "testView" withView(viewName) { sql(s"CREATE VIEW $viewName AS SELECT id FROM jt") - var e = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $viewName SELECT 1") - }.getMessage - assert(e.contains("Inserting into a view is not allowed. View: " + - s"`$SESSION_CATALOG_NAME`.`default`.`testview`")) + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $viewName SELECT 1") + }, + errorClass = "_LEGACY_ERROR_TEMP_1010", + parameters = Map("identifier" -> s"`$SESSION_CATALOG_NAME`.`default`.`testview`"), + context = ExpectedContext(fragment = viewName, start = 18, stop = 25) + ) val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat") - e = intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") - }.getMessage - assert(e.contains("default.testview is a view. 'LOAD DATA' expects a table")) + val sqlText = s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + errorClass = "_LEGACY_ERROR_TEMP_1013", + parameters = Map( + "nameParts" -> "spark_catalog.default.testview", + "viewStr" -> "view", + "cmd" -> "LOAD DATA", + "hintStr" -> ""), + context = ExpectedContext( + fragment = viewName, + start = sqlText.length - 8, + stop = sqlText.length - 1 + ) + ) } } @@ -761,12 +821,34 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { // Casting from DoubleType to LongType might truncate, throw an AnalysisException. val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", "id1") df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1") - intercept[AnalysisException](sql("SELECT * FROM testView")) + checkError( + exception = intercept[AnalysisException](sql("SELECT * FROM testView")), + errorClass = "CANNOT_UP_CAST_DATATYPE", + parameters = Map( + "expression" -> s"$SESSION_CATALOG_NAME.default.tab1.id", + "sourceType" -> "\"DOUBLE\"", + "targetType" -> "\"BIGINT\"", + "details" -> ("The type path of the target object is:\n\n" + + "You can either add an explicit cast to the input data or " + + "choose a higher precision type of the field in the target object") + ) + ) // Can't cast from ArrayType to LongType, throw an AnalysisException. val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1") df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1") - intercept[AnalysisException](sql("SELECT * FROM testView")) + checkError( + exception = intercept[AnalysisException](sql("SELECT * FROM testView")), + errorClass = "CANNOT_UP_CAST_DATATYPE", + parameters = Map( + "expression" -> s"$SESSION_CATALOG_NAME.default.tab1.id1", + "sourceType" -> "\"ARRAY<INT>\"", + "targetType" -> "\"BIGINT\"", + "details" -> ("The type path of the target object is:\n\n" + + "You can either add an explicit cast to the input data or " + + "choose a higher precision type of the field in the target object") + ) + ) } } } @@ -778,41 +860,60 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { sql("CREATE VIEW view3 AS SELECT * FROM view2") // Detect cyclic view reference on ALTER VIEW. - val e1 = intercept[AnalysisException] { - sql("ALTER VIEW view1 AS SELECT * FROM view2") - }.getMessage - assert(e1.contains(s"Recursive view `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"-> `$SESSION_CATALOG_NAME`.`default`.`view2` -> " + - s"`$SESSION_CATALOG_NAME`.`default`.`view1`)")) + checkError( + exception = intercept[AnalysisException] { + sql("ALTER VIEW view1 AS SELECT * FROM view2") + }, + errorClass = "RECURSIVE_VIEW", + parameters = Map( + "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`", + "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view1`") + ) + ) // Detect the most left cycle when there exists multiple cyclic view references. - val e2 = intercept[AnalysisException] { - sql("ALTER VIEW view1 AS SELECT * FROM view3 JOIN view2") - }.getMessage - assert(e2.contains(s"Recursive view `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"-> `$SESSION_CATALOG_NAME`.`default`.`view3` -> " + - s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " + - s"`$SESSION_CATALOG_NAME`.`default`.`view1`)")) + checkError( + exception = intercept[AnalysisException] { + sql("ALTER VIEW view1 AS SELECT * FROM view3 JOIN view2") + }, + errorClass = "RECURSIVE_VIEW", + parameters = Map( + "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`", + "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view3` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view1`") + ) + ) // Detect cyclic view reference on CREATE OR REPLACE VIEW. - val e3 = intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW view1 AS SELECT * FROM view2") - }.getMessage - assert(e3.contains(s"Recursive view `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"-> `$SESSION_CATALOG_NAME`.`default`.`view2` -> " + - s"`$SESSION_CATALOG_NAME`.`default`.`view1`)")) + checkError( + exception = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW view1 AS SELECT * FROM view2") + }, + errorClass = "RECURSIVE_VIEW", + parameters = Map( + "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`", + "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view1`") + ) + ) // Detect cyclic view reference from subqueries. - val e4 = intercept[AnalysisException] { - sql("ALTER VIEW view1 AS SELECT * FROM jt WHERE EXISTS (SELECT 1 FROM view2)") - }.getMessage - assert(e4.contains(s"Recursive view `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " + - s"-> `$SESSION_CATALOG_NAME`.`default`.`view2` -> " + - s"`$SESSION_CATALOG_NAME`.`default`.`view1`)")) + checkError( + exception = intercept[AnalysisException] { + sql("ALTER VIEW view1 AS SELECT * FROM jt WHERE EXISTS (SELECT 1 FROM view2)") + }, + errorClass = "RECURSIVE_VIEW", + parameters = Map( + "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`", + "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " + + s"`$SESSION_CATALOG_NAME`.`default`.`view1`")) + ) } } @@ -874,10 +975,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { withSQLConf(STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") { sql("CREATE TEMPORARY VIEW v1 AS SELECT * FROM t") Seq(4, 6, 5).toDF("c1").write.mode("overwrite").format("parquet").saveAsTable("t") - val e = intercept[SparkException] { - sql("SELECT * FROM v1").collect() - }.getMessage - assert(e.contains("does not exist")) + checkErrorMatchPVals( + exception = intercept[SparkException] { + sql("SELECT * FROM v1").collect() + }.getCause.asInstanceOf[SparkFileNotFoundException], + errorClass = "_LEGACY_ERROR_TEMP_2055", + parameters = Map("message" -> ".* does not exist") + ) } withSQLConf(STORE_ANALYZED_PLAN_FOR_VIEW.key -> "false") { @@ -891,10 +995,13 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { // alter view from non-legacy to legacy config sql("ALTER VIEW v1 AS SELECT * FROM t") Seq(2, 4, 6).toDF("c1").write.mode("overwrite").format("parquet").saveAsTable("t") - val e = intercept[SparkException] { - sql("SELECT * FROM v1").collect() - }.getMessage - assert(e.contains("does not exist")) + checkErrorMatchPVals( + exception = intercept[SparkException] { + sql("SELECT * FROM v1").collect() + }.getCause.asInstanceOf[SparkFileNotFoundException], + errorClass = "_LEGACY_ERROR_TEMP_2055", + parameters = Map("message" -> ".* does not exist") + ) } } } @@ -1003,20 +1110,38 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { )) } withSQLConf(ANSI_ENABLED.key -> "true") { - val e = intercept[ArithmeticException] { - sql("SELECT * FROM v5").collect() - }.getMessage - assert(e.contains("Division by zero")) + checkError( + exception = intercept[SparkArithmeticException] { + sql("SELECT * FROM v5").collect() + }, + errorClass = "DIVIDE_BY_ZERO", + parameters = Map("config" -> "\"spark.sql.ansi.enabled\""), + context = new ExpectedContext( + objectType = "VIEW", + objectName = s"$SESSION_CATALOG_NAME.default.v5", + fragment = "1/0", + startIndex = 7, + stopIndex = 9) + ) } } withSQLConf(ANSI_ENABLED.key -> "true") { sql("ALTER VIEW v1 AS SELECT 1/0 AS invalid") } - val e = intercept[ArithmeticException] { - sql("SELECT * FROM v1").collect() - }.getMessage - assert(e.contains("Division by zero")) + checkError( + exception = intercept[SparkArithmeticException] { + sql("SELECT * FROM v1").collect() + }, + errorClass = "DIVIDE_BY_ZERO", + parameters = Map("config" -> "\"spark.sql.ansi.enabled\""), + context = new ExpectedContext( + objectType = "VIEW", + objectName = s"$SESSION_CATALOG_NAME.default.v1", + fragment = "1/0", + startIndex = 7, + stopIndex = 9) + ) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala index ded657edc61..8e1bb08162e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala @@ -40,17 +40,29 @@ class ShowNamespacesSuite extends command.ShowNamespacesSuiteBase with CommandSu test("default v2 catalog doesn't support namespace") { withSQLConf(SQLConf.DEFAULT_CATALOG.key -> "testcat_no_namespace") { - val errMsg = intercept[AnalysisException] { - sql("SHOW NAMESPACES") - }.getMessage - assert(errMsg.contains("does not support namespaces")) + checkError( + exception = intercept[AnalysisException] { + sql("SHOW NAMESPACES") + }, + errorClass = "_LEGACY_ERROR_TEMP_1184", + parameters = Map( + "plugin" -> "testcat_no_namespace", + "ability" -> "namespaces" + ) + ) } } test("v2 catalog doesn't support namespace") { - val errMsg = intercept[AnalysisException] { - sql("SHOW NAMESPACES in testcat_no_namespace") - }.getMessage - assert(errMsg.contains("does not support namespaces")) + checkError( + exception = intercept[AnalysisException] { + sql("SHOW NAMESPACES in testcat_no_namespace") + }, + errorClass = "_LEGACY_ERROR_TEMP_1184", + parameters = Map( + "plugin" -> "testcat_no_namespace", + "ability" -> "namespaces" + ) + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 5d1d0389303..6067efc1d1c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -80,20 +80,24 @@ class ResolvedDataSourceSuite extends SharedSparkSession { test("avro: show deploy guide for loading the external avro module") { Seq("avro", "org.apache.spark.sql.avro").foreach { provider => - val message = intercept[AnalysisException] { - getProvidingClass(provider) - }.getMessage - assert(message.contains(s"Failed to find data source: $provider")) - assert(message.contains("Please deploy the application as per the deployment section of")) + checkError( + exception = intercept[AnalysisException] { + getProvidingClass(provider) + }, + errorClass = "_LEGACY_ERROR_TEMP_1139", + parameters = Map("provider" -> provider) + ) } } test("kafka: show deploy guide for loading the external kafka module") { - val message = intercept[AnalysisException] { - getProvidingClass("kafka") - }.getMessage - assert(message.contains("Failed to find data source: kafka")) - assert(message.contains("Please deploy the application as per the deployment section of")) + checkError( + exception = intercept[AnalysisException] { + getProvidingClass("kafka") + }, + errorClass = "_LEGACY_ERROR_TEMP_1140", + parameters = Map("provider" -> "kafka") + ) } test("error message for unknown data sources") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 558b8973da1..1a4862bf978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming.sources import java.util +import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, SupportsRead, SupportsWrite, Table, TableCapability, TableProvider} import org.apache.spark.sql.connector.catalog.TableCapability._ @@ -300,7 +301,7 @@ class StreamingDataSourceV2Suite extends StreamTest { Trigger.ProcessingTime(1000), Trigger.Continuous(1000)) - private def testPositiveCase(readFormat: String, writeFormat: String, trigger: Trigger): Unit = { + private def testCase(readFormat: String, writeFormat: String, trigger: Trigger): Unit = { testPositiveCaseWithQuery(readFormat, writeFormat, trigger)(_ => ()) } @@ -319,22 +320,12 @@ class StreamingDataSourceV2Suite extends StreamTest { query.stop() } - private def testNegativeCase( - readFormat: String, - writeFormat: String, - trigger: Trigger, - errorMsg: String) = { - val ex = intercept[UnsupportedOperationException] { - testPositiveCase(readFormat, writeFormat, trigger) - } - assert(ex.getMessage.contains(errorMsg)) - } - private def testPostCreationNegativeCase( readFormat: String, writeFormat: String, trigger: Trigger, - errorMsg: String) = { + errorClass: String, + parameters: Map[String, String]) = { val query = spark.readStream .format(readFormat) .load() @@ -346,7 +337,11 @@ class StreamingDataSourceV2Suite extends StreamTest { eventually(timeout(streamingTimeout)) { assert(query.exception.isDefined) assert(query.exception.get.cause != null) - assert(query.exception.get.cause.getMessage.contains(errorMsg)) + checkErrorMatchPVals( + exception = query.exception.get.cause.asInstanceOf[SparkUnsupportedOperationException], + errorClass = errorClass, + parameters = parameters + ) } } @@ -437,32 +432,59 @@ class StreamingDataSourceV2Suite extends StreamTest { trigger match { // Invalid - can't read at all case _ if !sourceTable.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => - testNegativeCase(read, write, trigger, - s"Data source $read does not support streamed reading") + checkError( + exception = intercept[SparkUnsupportedOperationException] { + testCase(read, write, trigger) + }, + errorClass = "_LEGACY_ERROR_TEMP_2049", + parameters = Map( + "className" -> "fake-read-neither-mode", + "operator" -> "reading" + ) + ) // Invalid - can't write case _ if !sinkTable.supports(STREAMING_WRITE) => - testNegativeCase(read, write, trigger, - s"Data source $write does not support streamed writing") + checkError( + exception = intercept[SparkUnsupportedOperationException] { + testCase(read, write, trigger) + }, + errorClass = "_LEGACY_ERROR_TEMP_2049", + parameters = Map( + "className" -> "fake-write-neither-mode", + "operator" -> "writing" + ) + ) case _: ContinuousTrigger => if (sourceTable.supports(CONTINUOUS_READ)) { // Valid microbatch queries. - testPositiveCase(read, write, trigger) + testCase(read, write, trigger) } else { // Invalid - trigger is continuous but reader is not - testNegativeCase( - read, write, trigger, s"Data source $read does not support continuous processing") + checkError( + exception = intercept[SparkUnsupportedOperationException] { + testCase(read, write, trigger) + }, + errorClass = "_LEGACY_ERROR_TEMP_2253", + parameters = Map("sourceName" -> "fake-read-microbatch-only") + ) } case microBatchTrigger => if (sourceTable.supports(MICRO_BATCH_READ)) { // Valid continuous queries. - testPositiveCase(read, write, trigger) + testCase(read, write, trigger) } else { // Invalid - trigger is microbatch but reader is not testPostCreationNegativeCase(read, write, trigger, - s"Data source $read does not support microbatch processing") + errorClass = "_LEGACY_ERROR_TEMP_2209", + parameters = Map( + "srcName" -> read, + "disabledSources" -> "", + "table" -> ".*" + ) + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 0f1a32d30b8..cde0da67e83 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -566,16 +566,18 @@ class MetastoreDataSourcesSuite extends QueryTest } test("path required error") { - assert( - intercept[AnalysisException] { + checkError( + exception = intercept[AnalysisException] { sparkSession.catalog.createTable( "createdJsonTable", "org.apache.spark.sql.json", Map.empty[String, String]) table("createdJsonTable") - }.getMessage.contains("Unable to infer schema"), - "We should complain that path is not specified.") + }, + errorClass = "UNABLE_TO_INFER_SCHEMA", + parameters = Map("format" -> "JSON") + ) sql("DROP TABLE IF EXISTS createdJsonTable") } @@ -918,33 +920,51 @@ class MetastoreDataSourcesSuite extends QueryTest withTable("appendOrcToParquet") { createDF(0, 9).write.format("parquet").saveAsTable("appendOrcToParquet") - val e = intercept[AnalysisException] { - createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet") - } - assert(e.getMessage.contains("The format of the existing table " + - s"$SESSION_CATALOG_NAME.default.appendorctoparquet is `Parquet")) + checkError( + exception = intercept[AnalysisException] { + createDF(10, 19).write.mode(SaveMode.Append).format("orc"). + saveAsTable("appendOrcToParquet") + }, + errorClass = "_LEGACY_ERROR_TEMP_1159", + parameters = Map( + "tableName" -> s"$SESSION_CATALOG_NAME.default.appendorctoparquet", + "existingProvider" -> "ParquetDataSourceV2", + "specifiedProvider" -> "OrcDataSourceV2" + ) + ) } withTable("appendParquetToJson") { createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson") - val msg = intercept[AnalysisException] { - createDF(10, 19).write.mode(SaveMode.Append).format("parquet") - .saveAsTable("appendParquetToJson") - }.getMessage - - assert(msg.contains("The format of the existing table " + - s"$SESSION_CATALOG_NAME.default.appendparquettojson is `Json")) + checkError( + exception = intercept[AnalysisException] { + createDF(10, 19).write.mode(SaveMode.Append).format("parquet") + .saveAsTable("appendParquetToJson") + }, + errorClass = "_LEGACY_ERROR_TEMP_1159", + parameters = Map( + "tableName" -> s"$SESSION_CATALOG_NAME.default.appendparquettojson", + "existingProvider" -> "JsonDataSourceV2", + "specifiedProvider" -> "ParquetDataSourceV2" + ) + ) } withTable("appendTextToJson") { createDF(0, 9).write.format("json").saveAsTable("appendTextToJson") - val msg = intercept[AnalysisException] { - createDF(10, 19).write.mode(SaveMode.Append).format("text") - .saveAsTable("appendTextToJson") - }.getMessage - // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat. - assert(msg.contains("The format of the existing table " + - s"$SESSION_CATALOG_NAME.default.appendtexttojson is `Json")) + checkError( + exception = intercept[AnalysisException] { + createDF(10, 19).write.mode(SaveMode.Append).format("text") + .saveAsTable("appendTextToJson") + }, + errorClass = "_LEGACY_ERROR_TEMP_1159", + // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat. + parameters = Map( + "tableName" -> s"$SESSION_CATALOG_NAME.default.appendtexttojson", + "existingProvider" -> "JsonDataSourceV2", + "specifiedProvider" -> "TextDataSourceV2" + ) + ) } } @@ -1236,16 +1256,18 @@ class MetastoreDataSourcesSuite extends QueryTest test("create a temp view using hive") { val tableName = "tab1" withTempView(tableName) { - val e = intercept[AnalysisException] { - sql( - s""" - |CREATE TEMPORARY VIEW $tableName - |(col1 int) - |USING hive - """.stripMargin) - }.getMessage - assert(e.contains("Hive data source can only be used with tables, you can't use it with " + - "CREATE TEMP VIEW USING")) + checkError( + exception = intercept[AnalysisException] { + sql( + s""" + |CREATE TEMPORARY VIEW $tableName + |(col1 int) + |USING hive + """.stripMargin) + }, + errorClass = "_LEGACY_ERROR_TEMP_1293", + parameters = Map.empty + ) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala index c81c4649107..e2417219467 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala @@ -209,17 +209,26 @@ class HiveSQLViewSuite extends SQLViewSuite with TestHiveSingleton { """.stripMargin ) - val cause = intercept[AnalysisException] { - sql("SHOW CREATE TABLE v1") - } - - assert(cause.getMessage.contains(" - partitioned view")) - - val causeForSpark = intercept[AnalysisException] { - sql("SHOW CREATE TABLE v1 AS SERDE") - } - - assert(causeForSpark.getMessage.contains(" - partitioned view")) + checkError( + exception = intercept[AnalysisException] { + sql("SHOW CREATE TABLE v1") + }, + errorClass = "_LEGACY_ERROR_TEMP_1271", + parameters = Map( + "unsupportedFeatures" -> " - partitioned view", + "table" -> s"`$SESSION_CATALOG_NAME`.`default`.`v1`" + ) + ) + checkError( + exception = intercept[AnalysisException] { + sql("SHOW CREATE TABLE v1 AS SERDE") + }, + errorClass = "_LEGACY_ERROR_TEMP_1275", + parameters = Map( + "table" -> s"`$SESSION_CATALOG_NAME`.`default`.`v1`", + "features" -> " - partitioned view" + ) + ) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala index 49f650fb1c5..1dbe405b217 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala @@ -32,10 +32,13 @@ class AlterNamespaceSetLocationSuite extends v1.AlterNamespaceSetLocationSuiteBa val ns = s"$catalog.$namespace" withNamespace(ns) { sql(s"CREATE NAMESPACE $ns") - val e = intercept[AnalysisException] { - sql(s"ALTER DATABASE $ns SET LOCATION 'loc'") - } - assert(e.getMessage.contains("does not support altering database location")) + checkError( + exception = intercept[AnalysisException] { + sql(s"ALTER DATABASE $ns SET LOCATION 'loc'") + }, + errorClass = "_LEGACY_ERROR_TEMP_1219", + parameters = Map.empty + ) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org