This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dedb4ad2c9 [SPARK-44384][SQL][TESTS] Use checkError() to check 
Exception in *View*Suite, *Namespace*Suite, *DataSource*Suite
4dedb4ad2c9 is described below

commit 4dedb4ad2c9b2ecd75dd9ccec5f565805752ad8e
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Thu Jul 13 16:26:34 2023 +0300

    [SPARK-44384][SQL][TESTS] Use checkError() to check Exception in 
*View*Suite, *Namespace*Suite, *DataSource*Suite
    
    ### What changes were proposed in this pull request?
    The pr aims to use `checkError()` to check `Exception` in `*View*Suite`, 
`*Namespace*Suite`, `*DataSource*Suite`, include:
    - sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite
    - sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite
    - 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite
    - 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite
    - 
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite
    - sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite
    - sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite
    - 
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite
    - 
sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite
    - 
sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite
    - 
sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite
    - 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite
    - 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite
    
    ### Why are the changes needed?
    Migration on checkError() will make the tests independent from the text of 
error messages.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    - Manually test.
    - Pass GA.
    
    Closes #41952 from panbingkun/view_and_namespace_checkerror.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/FileBasedDataSourceSuite.scala       |  67 +++--
 .../apache/spark/sql/NestedDataSourceSuite.scala   |  24 +-
 .../sql/connector/DataSourceV2DataFrameSuite.scala |  21 +-
 .../sql/connector/DataSourceV2FunctionSuite.scala  | 189 +++++++++++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  54 +++-
 .../spark/sql/connector/DataSourceV2Suite.scala    |  38 ++-
 .../apache/spark/sql/execution/SQLViewSuite.scala  | 313 ++++++++++++++-------
 .../execution/command/v2/ShowNamespacesSuite.scala |  28 +-
 .../sql/sources/ResolvedDataSourceSuite.scala      |  24 +-
 .../sources/StreamingDataSourceV2Suite.scala       |  68 +++--
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala |  88 +++---
 .../sql/hive/execution/HiveSQLViewSuite.scala      |  31 +-
 .../command/AlterNamespaceSetLocationSuite.scala   |  11 +-
 13 files changed, 671 insertions(+), 285 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index e7e53285d62..d69a68f5726 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{LocalFileSystem, Path}
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkException, SparkFileNotFoundException, 
SparkRuntimeException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GreaterThan, Literal}
@@ -129,11 +129,13 @@ class FileBasedDataSourceSuite extends QueryTest
   allFileBasedDataSources.foreach { format =>
     test(s"SPARK-23372 error while writing empty schema files using $format") {
       withTempPath { outputPath =>
-        val errMsg = intercept[AnalysisException] {
-          spark.emptyDataFrame.write.format(format).save(outputPath.toString)
-        }
-        assert(errMsg.getMessage.contains(
-          "Datasource does not support writing empty or nested empty schemas"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            spark.emptyDataFrame.write.format(format).save(outputPath.toString)
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1142",
+          parameters = Map.empty
+        )
       }
 
       // Nested empty schema
@@ -144,11 +146,13 @@ class FileBasedDataSourceSuite extends QueryTest
           StructField("c", IntegerType)
         ))
         val df = spark.createDataFrame(sparkContext.emptyRDD[Row], schema)
-        val errMsg = intercept[AnalysisException] {
-          df.write.format(format).save(outputPath.toString)
-        }
-        assert(errMsg.getMessage.contains(
-          "Datasource does not support writing empty or nested empty schemas"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            df.write.format(format).save(outputPath.toString)
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1142",
+          parameters = Map.empty
+        )
       }
     }
   }
@@ -242,10 +246,17 @@ class FileBasedDataSourceSuite extends QueryTest
           if (ignore.toBoolean) {
             testIgnoreMissingFiles(options)
           } else {
-            val exception = intercept[SparkException] {
-              testIgnoreMissingFiles(options)
+            val errorClass = sources match {
+              case "" => "_LEGACY_ERROR_TEMP_2062"
+              case _ => "_LEGACY_ERROR_TEMP_2055"
             }
-            assert(exception.getMessage().contains("does not exist"))
+            checkErrorMatchPVals(
+              exception = intercept[SparkException] {
+                testIgnoreMissingFiles(options)
+              }.getCause.asInstanceOf[SparkFileNotFoundException],
+              errorClass = errorClass,
+              parameters = Map("message" -> ".*does not exist")
+            )
           }
         }
       }
@@ -646,20 +657,20 @@ class FileBasedDataSourceSuite extends QueryTest
 
             // RuntimeException is triggered at executor side, which is then 
wrapped as
             // SparkException at driver side
-            val e1 = intercept[SparkException] {
-              sql(s"select b from $tableName").collect()
-            }
-            assert(
-              e1.getCause.isInstanceOf[RuntimeException] &&
-                e1.getCause.getMessage.contains(
-                  """Found duplicate field(s) "b": [b, B] in case-insensitive 
mode"""))
-            val e2 = intercept[SparkException] {
-              sql(s"select B from $tableName").collect()
-            }
-            assert(
-              e2.getCause.isInstanceOf[RuntimeException] &&
-                e2.getCause.getMessage.contains(
-                  """Found duplicate field(s) "b": [b, B] in case-insensitive 
mode"""))
+            checkError(
+              exception = intercept[SparkException] {
+                sql(s"select b from $tableName").collect()
+              }.getCause.asInstanceOf[SparkRuntimeException],
+              errorClass = "_LEGACY_ERROR_TEMP_2093",
+              parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" 
-> "[b, B]")
+            )
+            checkError(
+              exception = intercept[SparkException] {
+                sql(s"select B from $tableName").collect()
+              }.getCause.asInstanceOf[SparkRuntimeException],
+              errorClass = "_LEGACY_ERROR_TEMP_2093",
+              parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" 
-> "[b, B]")
+            )
           }
 
           withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala
index ded2a80c6fa..f83e7b6727b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/NestedDataSourceSuite.scala
@@ -55,17 +55,19 @@ trait NestedDataSourceSuiteBase extends QueryTest with 
SharedSparkSession {
             withTempPath { dir =>
               val path = dir.getCanonicalPath
               save(selectExpr, format, path)
-              val e = intercept[AnalysisException] {
-                spark
-                  .read
-                  .options(readOptions(caseInsensitiveSchema))
-                  .schema(caseInsensitiveSchema)
-                  .format(format)
-                  .load(path)
-                  .collect()
-              }
-              assert(e.getErrorClass == "COLUMN_ALREADY_EXISTS")
-              assert(e.getMessageParameters().get("columnName") == 
"`camelcase`")
+              checkError(
+                exception = intercept[AnalysisException] {
+                  spark
+                    .read
+                    .options(readOptions(caseInsensitiveSchema))
+                    .schema(caseInsensitiveSchema)
+                    .format(format)
+                    .load(path)
+                    .collect()
+                },
+                errorClass = "COLUMN_ALREADY_EXISTS",
+                parameters = Map("columnName" -> "`camelcase`")
+              )
             }
           }
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
index 3678f29ab49..b8e0d50dc9c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala
@@ -182,12 +182,21 @@ class DataSourceV2DataFrameSuite
           Array.empty[Transform], Collections.emptyMap[String, String])
         val df = sql(s"select interval 1 millisecond as i")
         val v2Writer = df.writeTo("testcat.table_name")
-        val e1 = intercept[AnalysisException](v2Writer.append())
-        assert(e1.getMessage.contains(s"Cannot use interval type in the table 
schema."))
-        val e2 = intercept[AnalysisException](v2Writer.overwrite(df("i")))
-        assert(e2.getMessage.contains(s"Cannot use interval type in the table 
schema."))
-        val e3 = intercept[AnalysisException](v2Writer.overwritePartitions())
-        assert(e3.getMessage.contains(s"Cannot use interval type in the table 
schema."))
+        checkError(
+          exception = intercept[AnalysisException](v2Writer.append()),
+          errorClass = "_LEGACY_ERROR_TEMP_1183",
+          parameters = Map.empty
+        )
+        checkError(
+          exception = 
intercept[AnalysisException](v2Writer.overwrite(df("i"))),
+          errorClass = "_LEGACY_ERROR_TEMP_1183",
+          parameters = Map.empty
+        )
+        checkError(
+          exception = 
intercept[AnalysisException](v2Writer.overwritePartitions()),
+          errorClass = "_LEGACY_ERROR_TEMP_1183",
+          parameters = Map.empty
+        )
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index eea2eebf849..32391eac9a8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -157,24 +157,39 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
 
   test("non-function catalog") {
     withSQLConf("spark.sql.catalog.testcat" -> 
classOf[BasicInMemoryTableCatalog].getName) {
-      assert(intercept[AnalysisException](
-        sql("SELECT testcat.strlen('abc')").collect()
-      ).getMessage.contains("Catalog testcat does not support functions"))
+      checkError(
+        exception = intercept[AnalysisException](
+          sql("SELECT testcat.strlen('abc')").collect()
+        ),
+        errorClass = "_LEGACY_ERROR_TEMP_1184",
+        parameters = Map("plugin" -> "testcat", "ability" -> "functions")
+      )
     }
   }
 
   test("DESCRIBE FUNCTION: only support session catalog") {
     addFunction(Identifier.of(Array.empty, "abc"), new JavaStrLen(new 
JavaStrLenNoImpl))
 
-    val e = intercept[AnalysisException] {
-      sql("DESCRIBE FUNCTION testcat.abc")
-    }
-    assert(e.message.contains("Catalog testcat does not support functions"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("DESCRIBE FUNCTION testcat.abc")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1184",
+      parameters = Map(
+        "plugin" -> "testcat",
+        "ability" -> "functions"
+      )
+    )
 
-    val e1 = intercept[AnalysisException] {
-      sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
-    }
-    assert(e1.message.contains("requires a single-part namespace"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
+      },
+      errorClass = "REQUIRES_SINGLE_PART_NAMESPACE",
+      parameters = Map(
+        "sessionCatalog" -> "spark_catalog",
+        "namespace" -> "`default`.`ns1`.`ns2`")
+    )
   }
 
   test("DROP FUNCTION: only support session catalog") {
@@ -307,8 +322,14 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
   test("scalar function: bad magic method") {
     
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"),
 emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenBadMagic))
-    assert(intercept[SparkException](sql("SELECT 
testcat.ns.strlen('abc')").collect())
-      .getMessage.contains("Cannot find a compatible"))
+    // TODO assign a error-classes name
+    checkError(
+      exception = intercept[SparkException] {
+        sql("SELECT testcat.ns.strlen('abc')").collect()
+      },
+      errorClass = null,
+      parameters = Map.empty
+    )
   }
 
   test("scalar function: bad magic method with default impl") {
@@ -327,10 +348,35 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
     
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"),
 emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(StrLenDefault))
 
-    assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen(42)"))
-      .getMessage.contains("Expect StringType"))
-    assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen('a', 
'b')"))
-      .getMessage.contains("Expect exactly one argument"))
+    checkError(
+      exception = intercept[AnalysisException](sql("SELECT 
testcat.ns.strlen(42)")),
+      errorClass = "_LEGACY_ERROR_TEMP_1198",
+      parameters = Map(
+        "unbound" -> "strlen",
+        "arguments" -> "int",
+        "unsupported" -> "Expect StringType"
+      ),
+      context = ExpectedContext(
+        fragment = "testcat.ns.strlen(42)",
+        start = 7,
+        stop = 27
+      )
+    )
+
+    checkError(
+      exception = intercept[AnalysisException](sql("SELECT 
testcat.ns.strlen('a', 'b')")),
+      errorClass = "_LEGACY_ERROR_TEMP_1198",
+      parameters = Map(
+        "unbound" -> "strlen",
+        "arguments" -> "string, string",
+        "unsupported" -> "Expect exactly one argument"
+      ),
+      context = ExpectedContext(
+        fragment = "testcat.ns.strlen('a', 'b')",
+        start = 7,
+        stop = 33
+      )
+    )
   }
 
   test("scalar function: default produceResult in Java") {
@@ -373,22 +419,52 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
     
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"),
 emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"),
       new JavaStrLen(new JavaStrLenNoImpl))
-    assert(intercept[AnalysisException](sql("SELECT 
testcat.ns.strlen('abc')").collect())
-      .getMessage.contains("neither implement magic method nor override 
'produceResult'"))
+    // TODO assign a error-classes name
+    checkError(
+      exception = intercept[AnalysisException](sql("SELECT 
testcat.ns.strlen('abc')").collect()),
+      errorClass = null,
+      parameters = Map.empty,
+      context = ExpectedContext(
+        fragment = "testcat.ns.strlen('abc')",
+        start = 7,
+        stop = 30
+      )
+    )
   }
 
   test("SPARK-35390: scalar function w/ bad input types") {
     
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"),
 emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"), 
StrLen(StrLenBadInputTypes))
-    assert(intercept[AnalysisException](sql("SELECT 
testcat.ns.strlen('abc')").collect())
-        .getMessage.contains("parameters returned from 'inputTypes()'"))
+    checkError(
+      exception = intercept[AnalysisException](sql("SELECT 
testcat.ns.strlen('abc')").collect()),
+      errorClass = "_LEGACY_ERROR_TEMP_1199",
+      parameters = Map(
+        "bound" -> "strlen_bad_input_types",
+        "argsLen" -> "1",
+        "inputTypesLen" -> "2"
+      ),
+      context = ExpectedContext(
+        fragment = "testcat.ns.strlen('abc')",
+        start = 7,
+        stop = 30
+      )
+    )
   }
 
   test("SPARK-35390: scalar function w/ mismatch type parameters from magic 
method") {
     
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"),
 emptyProps)
     addFunction(Identifier.of(Array("ns"), "add"), new JavaLongAdd(new 
JavaLongAddMismatchMagic))
-    assert(intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 
2L)").collect())
-        .getMessage.contains("neither implement magic method nor override 
'produceResult'"))
+    // TODO assign a error-classes name
+    checkError(
+      exception = intercept[AnalysisException](sql("SELECT testcat.ns.add(1L, 
2L)").collect()),
+      errorClass = null,
+      parameters = Map.empty,
+      context = ExpectedContext(
+        fragment = "testcat.ns.add(1L, 2L)",
+        start = 7,
+        stop = 28
+      )
+    )
   }
 
   test("SPARK-35390: scalar function w/ type coercion") {
@@ -402,10 +478,33 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
       checkAnswer(sql(s"SELECT testcat.ns.$name(42L, 58)"), Row(100) :: Nil)
       checkAnswer(sql(s"SELECT testcat.ns.$name(42, 58L)"), Row(100) :: Nil)
 
+      val paramIndex = name match {
+        case "add" => "1"
+        case "add2" => "2"
+        case "add3" => "1"
+      }
+
       // can't cast date time interval to long
-      assert(intercept[AnalysisException](
-        sql(s"SELECT testcat.ns.$name(date '2021-06-01' - date '2011-06-01', 
93)").collect())
-          .getMessage.contains("due to data type mismatch"))
+      val sqlText = s"SELECT testcat.ns.$name(date '2021-06-01' - date 
'2011-06-01', 93)"
+      checkErrorMatchPVals(
+        exception = intercept[AnalysisException] {
+          sql(sqlText).collect()
+        },
+        errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+        sqlState = None,
+        parameters = Map(
+          "sqlExpr" -> ".*",
+          "paramIndex" -> paramIndex,
+          "inputSql" -> "\"\\(DATE '2021-06-01' - DATE '2011-06-01'\\)\"",
+          "inputType" -> "\"INTERVAL DAY\"",
+          "requiredType" -> "\"BIGINT\""
+        ),
+        context = ExpectedContext(
+          fragment = s"testcat.ns.$name(date '2021-06-01' - date '2011-06-01', 
93)",
+          start = 7,
+          stop = sqlText.length - 1
+        )
+      )
     }
   }
 
@@ -510,8 +609,20 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
       addFunction(Identifier.of(Array("ns"), "avg"), IntegralAverage)
 
       Seq(1.toShort, 2.toShort).toDF("i").write.saveAsTable(t)
-      assert(intercept[AnalysisException](sql(s"SELECT testcat.ns.avg(i) from 
$t"))
-        .getMessage.contains("Unsupported non-integral type: ShortType"))
+      checkError(
+        exception = intercept[AnalysisException](sql(s"SELECT 
testcat.ns.avg(i) from $t")),
+        errorClass = "_LEGACY_ERROR_TEMP_1198",
+        parameters = Map(
+          "unbound" -> "iavg",
+          "arguments" -> "smallint",
+          "unsupported" -> "Unsupported non-integral type: ShortType"
+        ),
+        context = ExpectedContext(
+          fragment = "testcat.ns.avg(i)",
+          start = 7,
+          stop = 23
+        )
+      )
     }
   }
 
@@ -530,9 +641,25 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
         Row(BigDecimal(50.5)) :: Nil)
 
       // can't cast interval to decimal
-      assert(intercept[AnalysisException](sql("SELECT testcat.ns.avg(*) from 
values" +
-          " (date '2021-06-01' - date '2011-06-01'), (date '2000-01-01' - date 
'1900-01-01')"))
-          .getMessage.contains("due to data type mismatch"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SELECT testcat.ns.avg(*) from values " +
+            "(date '2021-06-01' - date '2011-06-01'), (date '2000-01-01' - 
date '1900-01-01')")
+        },
+        errorClass = "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
+        parameters = Map(
+          "sqlExpr" -> "\"v2aggregator(col1)\"",
+          "paramIndex" -> "1",
+          "inputSql" -> "\"col1\"",
+          "inputType" -> "\"INTERVAL DAY\"",
+          "requiredType" -> "\"DECIMAL(38,18)\""
+        ),
+        context = ExpectedContext(
+          fragment = "testcat.ns.avg(*)",
+          start = 7,
+          stop = 23
+        )
+      )
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 13277cfdd8d..06f5600e0d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -815,10 +815,14 @@ class DataSourceV2SQLSuiteV1Filter
         if (nullable) {
           insertNullValueAndCheck()
         } else {
-          val e = intercept[Exception] {
-            insertNullValueAndCheck()
-          }
-          assert(e.getMessage.contains("Null value appeared in non-nullable 
field"))
+          // TODO assign a error-classes name
+          checkError(
+            exception = intercept[SparkException] {
+              insertNullValueAndCheck()
+            },
+            errorClass = null,
+            parameters = Map.empty
+          )
         }
     }
   }
@@ -1109,9 +1113,17 @@ class DataSourceV2SQLSuiteV1Filter
       sql(s"INSERT INTO $t1(data, id) VALUES('c', 3)")
       verifyTable(t1, df)
       // Missing columns
-      assert(intercept[AnalysisException] {
-        sql(s"INSERT INTO $t1 VALUES(4)")
-      }.getMessage.contains("not enough data columns"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT INTO $t1 VALUES(4)")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`default`.`tbl`",
+          "tableColumns" -> "`id`, `data`",
+          "dataColumns" -> "`col1`"
+        )
+      )
       // Duplicate columns
       checkError(
         exception = intercept[AnalysisException] {
@@ -1136,9 +1148,17 @@ class DataSourceV2SQLSuiteV1Filter
       sql(s"INSERT OVERWRITE $t1(data, id) VALUES('c', 3)")
       verifyTable(t1, Seq((3L, "c")).toDF("id", "data"))
       // Missing columns
-      assert(intercept[AnalysisException] {
-        sql(s"INSERT OVERWRITE $t1 VALUES(4)")
-      }.getMessage.contains("not enough data columns"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE $t1 VALUES(4)")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`default`.`tbl`",
+          "tableColumns" -> "`id`, `data`",
+          "dataColumns" -> "`col1`"
+        )
+      )
       // Duplicate columns
       checkError(
         exception = intercept[AnalysisException] {
@@ -1164,9 +1184,17 @@ class DataSourceV2SQLSuiteV1Filter
       sql(s"INSERT OVERWRITE $t1(data, data2, id) VALUES('c', 'e', 1)")
       verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", 
"data2"))
       // Missing columns
-      assert(intercept[AnalysisException] {
-        sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)")
-      }.getMessage.contains("not enough data columns"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`default`.`tbl`",
+          "tableColumns" -> "`id`, `data`, `data2`",
+          "dataColumns" -> "`col1`, `col2`"
+        )
+      )
       // Duplicate columns
       checkError(
         exception = intercept[AnalysisException] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
index 02990a7a40d..236b4c702d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2Suite.scala
@@ -421,19 +421,31 @@ class DataSourceV2Suite extends QueryTest with 
SharedSparkSession with AdaptiveS
           spark.read.format(cls.getName).option("path", path).load(),
           spark.range(5).select($"id", -$"id"))
 
-        val e = intercept[AnalysisException] {
-          spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j"))
-            .write.format(cls.getName)
-            .option("path", path).mode("ignore").save()
-        }
-        assert(e.message.contains("please use Append or Overwrite modes 
instead"))
-
-        val e2 = intercept[AnalysisException] {
-          spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j"))
-            .write.format(cls.getName)
-            .option("path", path).mode("error").save()
-        }
-        assert(e2.getMessage.contains("please use Append or Overwrite modes 
instead"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j"))
+              .write.format(cls.getName)
+              .option("path", path).mode("ignore").save()
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1308",
+          parameters = Map(
+            "source" -> cls.getName,
+            "createMode" -> "Ignore"
+          )
+        )
+
+        checkError(
+          exception = intercept[AnalysisException] {
+            spark.range(5).select($"id" as Symbol("i"), -$"id" as Symbol("j"))
+              .write.format(cls.getName)
+              .option("path", path).mode("error").save()
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1308",
+          parameters = Map(
+            "source" -> cls.getName,
+            "createMode" -> "ErrorIfExists"
+          )
+        )
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index fe4723eaf28..e258d600a2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkArithmeticException, SparkException, 
SparkFileNotFoundException}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Add, Alias, Divide}
@@ -111,18 +111,35 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
   test("error handling: existing a table with the duplicate name when 
creating/altering a view") {
     withTable("tab1") {
       sql("CREATE TABLE tab1 (id int) USING parquet")
-      var e = intercept[AnalysisException] {
-        sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
-      }.getMessage
-      assert(e.contains("`tab1` is not a view"))
-      e = intercept[AnalysisException] {
-        sql("CREATE VIEW tab1 AS SELECT * FROM jt")
-      }.getMessage
-      assert(e.contains("`tab1` is not a view"))
-      e = intercept[AnalysisException] {
-        sql("ALTER VIEW tab1 AS SELECT * FROM jt")
-      }.getMessage
-      assert(e.contains("tab1 is a table. 'ALTER VIEW ... AS' expects a 
view."))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1278",
+        parameters = Map("name" -> s"`$SESSION_CATALOG_NAME`.`default`.`tab1`")
+      )
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("CREATE VIEW tab1 AS SELECT * FROM jt")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1278",
+        parameters = Map("name" -> s"`$SESSION_CATALOG_NAME`.`default`.`tab1`")
+      )
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("ALTER VIEW tab1 AS SELECT * FROM jt")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1015",
+        parameters = Map(
+          "identifier" -> "default.tab1",
+          "cmd" -> "ALTER VIEW ... AS",
+          "hintStr" -> ""
+        ),
+        context = ExpectedContext(
+          fragment = "tab1",
+          start = 11,
+          stop = 14)
+      )
     }
   }
 
@@ -213,26 +230,60 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
 
       val dataFilePath =
         
Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat")
-      val e2 = intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
-      }.getMessage
-      assert(e2.contains(s"$viewName is a temp view. 'LOAD DATA' expects a 
table"))
-      val e3 = intercept[AnalysisException] {
-        sql(s"SHOW CREATE TABLE $viewName")
-      }.getMessage
-      assert(e3.contains(
-        s"$viewName is a temp view. 'SHOW CREATE TABLE' expects a table or 
permanent view."))
-      val e4 = intercept[AnalysisException] {
-        sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
-      }.getMessage
-      assert(e4.contains(
-        s"$viewName is a temp view. 'ANALYZE TABLE' expects a table or 
permanent view."))
+      val sqlText = s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE 
$viewName"""
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(sqlText)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1013",
+        parameters = Map(
+          "nameParts" -> viewName,
+          "viewStr" -> "temp view",
+          "cmd" -> "LOAD DATA",
+          "hintStr" -> ""
+        ),
+        context = ExpectedContext(
+          fragment = viewName,
+          start = sqlText.length - 8,
+          stop = sqlText.length - 1
+        )
+      )
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"SHOW CREATE TABLE $viewName")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1016",
+        parameters = Map(
+          "nameParts" -> "testView",
+          "cmd" -> "SHOW CREATE TABLE"
+        ),
+        context = ExpectedContext(
+          fragment = viewName,
+          start = 18,
+          stop = 25
+        )
+      )
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1016",
+        parameters = Map(
+          "nameParts" -> "testView",
+          "cmd" -> "ANALYZE TABLE"
+        ),
+        context = ExpectedContext(
+          fragment = viewName,
+          start = 14,
+          stop = 21
+        )
+      )
       checkError(
         exception = intercept[AnalysisException] {
           sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
         },
         errorClass = "UNSUPPORTED_FEATURE.ANALYZE_UNCACHED_TEMP_VIEW",
-        parameters = Map("viewName" -> "`testView`")
+        parameters = Map("viewName" -> s"`$viewName`")
       )
     }
   }
@@ -243,16 +294,9 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
   }
 
   private def assertAnalysisErrorClass(query: String,
-                                       errorClass: String,
-                                       parameters: Map[String, String]): Unit 
= {
-    val e = intercept[AnalysisException](sql(query))
-    checkError(e, errorClass = errorClass, parameters = parameters)
-  }
-
-  private def assertAnalysisErrorClass(query: String,
-                                       errorClass: String,
-                                       parameters: Map[String, String],
-                                       context: ExpectedContext): Unit = {
+      errorClass: String,
+      parameters: Map[String, String],
+      context: ExpectedContext): Unit = {
     val e = intercept[AnalysisException](sql(query))
     checkError(e, errorClass = errorClass, parameters = parameters, context = 
context)
   }
@@ -268,18 +312,34 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
     val viewName = "testView"
     withView(viewName) {
       sql(s"CREATE VIEW $viewName AS SELECT id FROM jt")
-      var e = intercept[AnalysisException] {
-        sql(s"INSERT INTO TABLE $viewName SELECT 1")
-      }.getMessage
-      assert(e.contains("Inserting into a view is not allowed. View: " +
-        s"`$SESSION_CATALOG_NAME`.`default`.`testview`"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"INSERT INTO TABLE $viewName SELECT 1")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1010",
+        parameters = Map("identifier" -> 
s"`$SESSION_CATALOG_NAME`.`default`.`testview`"),
+        context = ExpectedContext(fragment = viewName, start = 18, stop = 25)
+      )
 
       val dataFilePath =
         
Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat")
-      e = intercept[AnalysisException] {
-        sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
-      }.getMessage
-      assert(e.contains("default.testview is a view. 'LOAD DATA' expects a 
table"))
+      val sqlText = s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE 
$viewName"""
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(sqlText)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1013",
+        parameters = Map(
+          "nameParts" -> "spark_catalog.default.testview",
+          "viewStr" -> "view",
+          "cmd" -> "LOAD DATA",
+          "hintStr" -> ""),
+        context = ExpectedContext(
+          fragment = viewName,
+          start = sqlText.length - 8,
+          stop = sqlText.length - 1
+        )
+      )
     }
   }
 
@@ -761,12 +821,34 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
         // Casting from DoubleType to LongType might truncate, throw an 
AnalysisException.
         val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", 
"id1")
         df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1")
-        intercept[AnalysisException](sql("SELECT * FROM testView"))
+        checkError(
+          exception = intercept[AnalysisException](sql("SELECT * FROM 
testView")),
+          errorClass = "CANNOT_UP_CAST_DATATYPE",
+          parameters = Map(
+            "expression" -> s"$SESSION_CATALOG_NAME.default.tab1.id",
+            "sourceType" -> "\"DOUBLE\"",
+            "targetType" -> "\"BIGINT\"",
+            "details" -> ("The type path of the target object is:\n\n" +
+              "You can either add an explicit cast to the input data or " +
+              "choose a higher precision type of the field in the target 
object")
+          )
+        )
 
         // Can't cast from ArrayType to LongType, throw an AnalysisException.
         val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1")
         df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1")
-        intercept[AnalysisException](sql("SELECT * FROM testView"))
+        checkError(
+          exception = intercept[AnalysisException](sql("SELECT * FROM 
testView")),
+          errorClass = "CANNOT_UP_CAST_DATATYPE",
+          parameters = Map(
+            "expression" -> s"$SESSION_CATALOG_NAME.default.tab1.id1",
+            "sourceType" -> "\"ARRAY<INT>\"",
+            "targetType" -> "\"BIGINT\"",
+            "details" -> ("The type path of the target object is:\n\n" +
+              "You can either add an explicit cast to the input data or " +
+              "choose a higher precision type of the field in the target 
object")
+          )
+        )
       }
     }
   }
@@ -778,41 +860,60 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
       sql("CREATE VIEW view3 AS SELECT * FROM view2")
 
       // Detect cyclic view reference on ALTER VIEW.
-      val e1 = intercept[AnalysisException] {
-        sql("ALTER VIEW view1 AS SELECT * FROM view2")
-      }.getMessage
-      assert(e1.contains(s"Recursive view 
`$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"-> `$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
-        s"`$SESSION_CATALOG_NAME`.`default`.`view1`)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("ALTER VIEW view1 AS SELECT * FROM view2")
+        },
+        errorClass = "RECURSIVE_VIEW",
+        parameters = Map(
+          "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`",
+          "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view1`")
+        )
+      )
 
       // Detect the most left cycle when there exists multiple cyclic view 
references.
-      val e2 = intercept[AnalysisException] {
-        sql("ALTER VIEW view1 AS SELECT * FROM view3 JOIN view2")
-      }.getMessage
-      assert(e2.contains(s"Recursive view 
`$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"-> `$SESSION_CATALOG_NAME`.`default`.`view3` -> " +
-        s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
-        s"`$SESSION_CATALOG_NAME`.`default`.`view1`)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("ALTER VIEW view1 AS SELECT * FROM view3 JOIN view2")
+        },
+        errorClass = "RECURSIVE_VIEW",
+        parameters = Map(
+          "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`",
+          "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view3` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view1`")
+        )
+      )
 
       // Detect cyclic view reference on CREATE OR REPLACE VIEW.
-      val e3 = intercept[AnalysisException] {
-        sql("CREATE OR REPLACE VIEW view1 AS SELECT * FROM view2")
-      }.getMessage
-      assert(e3.contains(s"Recursive view 
`$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"-> `$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
-        s"`$SESSION_CATALOG_NAME`.`default`.`view1`)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("CREATE OR REPLACE VIEW view1 AS SELECT * FROM view2")
+        },
+        errorClass = "RECURSIVE_VIEW",
+        parameters = Map(
+          "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`",
+          "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view1`")
+        )
+      )
 
       // Detect cyclic view reference from subqueries.
-      val e4 = intercept[AnalysisException] {
-        sql("ALTER VIEW view1 AS SELECT * FROM jt WHERE EXISTS (SELECT 1 FROM 
view2)")
-      }.getMessage
-      assert(e4.contains(s"Recursive view 
`$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"detected (cycle: `$SESSION_CATALOG_NAME`.`default`.`view1` " +
-        s"-> `$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
-        s"`$SESSION_CATALOG_NAME`.`default`.`view1`)"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("ALTER VIEW view1 AS SELECT * FROM jt WHERE EXISTS (SELECT 1 
FROM view2)")
+        },
+        errorClass = "RECURSIVE_VIEW",
+        parameters = Map(
+          "viewIdent" -> s"`$SESSION_CATALOG_NAME`.`default`.`view1`",
+          "newPath" -> (s"`$SESSION_CATALOG_NAME`.`default`.`view1` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view2` -> " +
+            s"`$SESSION_CATALOG_NAME`.`default`.`view1`"))
+      )
     }
   }
 
@@ -874,10 +975,13 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
         withSQLConf(STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
           sql("CREATE TEMPORARY VIEW v1 AS SELECT * FROM t")
           Seq(4, 6, 
5).toDF("c1").write.mode("overwrite").format("parquet").saveAsTable("t")
-          val e = intercept[SparkException] {
-            sql("SELECT * FROM v1").collect()
-          }.getMessage
-          assert(e.contains("does not exist"))
+          checkErrorMatchPVals(
+            exception = intercept[SparkException] {
+              sql("SELECT * FROM v1").collect()
+            }.getCause.asInstanceOf[SparkFileNotFoundException],
+            errorClass = "_LEGACY_ERROR_TEMP_2055",
+            parameters = Map("message" -> ".* does not exist")
+          )
         }
 
         withSQLConf(STORE_ANALYZED_PLAN_FOR_VIEW.key -> "false") {
@@ -891,10 +995,13 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
           // alter view from non-legacy to legacy config
           sql("ALTER VIEW v1 AS SELECT * FROM t")
           Seq(2, 4, 
6).toDF("c1").write.mode("overwrite").format("parquet").saveAsTable("t")
-          val e = intercept[SparkException] {
-            sql("SELECT * FROM v1").collect()
-          }.getMessage
-          assert(e.contains("does not exist"))
+          checkErrorMatchPVals(
+            exception = intercept[SparkException] {
+              sql("SELECT * FROM v1").collect()
+            }.getCause.asInstanceOf[SparkFileNotFoundException],
+            errorClass = "_LEGACY_ERROR_TEMP_2055",
+            parameters = Map("message" -> ".* does not exist")
+          )
         }
       }
     }
@@ -1003,20 +1110,38 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
             ))
           }
           withSQLConf(ANSI_ENABLED.key -> "true") {
-            val e = intercept[ArithmeticException] {
-              sql("SELECT * FROM v5").collect()
-            }.getMessage
-            assert(e.contains("Division by zero"))
+            checkError(
+              exception = intercept[SparkArithmeticException] {
+                sql("SELECT * FROM v5").collect()
+              },
+              errorClass = "DIVIDE_BY_ZERO",
+              parameters = Map("config" -> "\"spark.sql.ansi.enabled\""),
+              context = new ExpectedContext(
+                objectType = "VIEW",
+                objectName = s"$SESSION_CATALOG_NAME.default.v5",
+                fragment = "1/0",
+                startIndex = 7,
+                stopIndex = 9)
+            )
           }
         }
 
         withSQLConf(ANSI_ENABLED.key -> "true") {
           sql("ALTER VIEW v1 AS SELECT 1/0 AS invalid")
         }
-        val e = intercept[ArithmeticException] {
-          sql("SELECT * FROM v1").collect()
-        }.getMessage
-        assert(e.contains("Division by zero"))
+        checkError(
+          exception = intercept[SparkArithmeticException] {
+            sql("SELECT * FROM v1").collect()
+          },
+          errorClass = "DIVIDE_BY_ZERO",
+          parameters = Map("config" -> "\"spark.sql.ansi.enabled\""),
+          context = new ExpectedContext(
+            objectType = "VIEW",
+            objectName = s"$SESSION_CATALOG_NAME.default.v1",
+            fragment = "1/0",
+            startIndex = 7,
+            stopIndex = 9)
+        )
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala
index ded657edc61..8e1bb08162e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowNamespacesSuite.scala
@@ -40,17 +40,29 @@ class ShowNamespacesSuite extends 
command.ShowNamespacesSuiteBase with CommandSu
 
   test("default v2 catalog doesn't support namespace") {
     withSQLConf(SQLConf.DEFAULT_CATALOG.key -> "testcat_no_namespace") {
-      val errMsg = intercept[AnalysisException] {
-        sql("SHOW NAMESPACES")
-      }.getMessage
-      assert(errMsg.contains("does not support namespaces"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("SHOW NAMESPACES")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1184",
+        parameters = Map(
+          "plugin" -> "testcat_no_namespace",
+          "ability" -> "namespaces"
+        )
+      )
     }
   }
 
   test("v2 catalog doesn't support namespace") {
-    val errMsg = intercept[AnalysisException] {
-      sql("SHOW NAMESPACES in testcat_no_namespace")
-    }.getMessage
-    assert(errMsg.contains("does not support namespaces"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("SHOW NAMESPACES in testcat_no_namespace")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1184",
+      parameters = Map(
+        "plugin" -> "testcat_no_namespace",
+        "ability" -> "namespaces"
+      )
+    )
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 5d1d0389303..6067efc1d1c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -80,20 +80,24 @@ class ResolvedDataSourceSuite extends SharedSparkSession {
 
   test("avro: show deploy guide for loading the external avro module") {
     Seq("avro", "org.apache.spark.sql.avro").foreach { provider =>
-      val message = intercept[AnalysisException] {
-        getProvidingClass(provider)
-      }.getMessage
-      assert(message.contains(s"Failed to find data source: $provider"))
-      assert(message.contains("Please deploy the application as per the 
deployment section of"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          getProvidingClass(provider)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1139",
+        parameters = Map("provider" -> provider)
+      )
     }
   }
 
   test("kafka: show deploy guide for loading the external kafka module") {
-    val message = intercept[AnalysisException] {
-      getProvidingClass("kafka")
-    }.getMessage
-    assert(message.contains("Failed to find data source: kafka"))
-    assert(message.contains("Please deploy the application as per the 
deployment section of"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        getProvidingClass("kafka")
+      },
+      errorClass = "_LEGACY_ERROR_TEMP_1140",
+      parameters = Map("provider" -> "kafka")
+    )
   }
 
   test("error message for unknown data sources") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 558b8973da1..1a4862bf978 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming.sources
 
 import java.util
 
+import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.connector.catalog.{SessionConfigSupport, 
SupportsRead, SupportsWrite, Table, TableCapability, TableProvider}
 import org.apache.spark.sql.connector.catalog.TableCapability._
@@ -300,7 +301,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
     Trigger.ProcessingTime(1000),
     Trigger.Continuous(1000))
 
-  private def testPositiveCase(readFormat: String, writeFormat: String, 
trigger: Trigger): Unit = {
+  private def testCase(readFormat: String, writeFormat: String, trigger: 
Trigger): Unit = {
     testPositiveCaseWithQuery(readFormat, writeFormat, trigger)(_ => ())
   }
 
@@ -319,22 +320,12 @@ class StreamingDataSourceV2Suite extends StreamTest {
     query.stop()
   }
 
-  private def testNegativeCase(
-      readFormat: String,
-      writeFormat: String,
-      trigger: Trigger,
-      errorMsg: String) = {
-    val ex = intercept[UnsupportedOperationException] {
-      testPositiveCase(readFormat, writeFormat, trigger)
-    }
-    assert(ex.getMessage.contains(errorMsg))
-  }
-
   private def testPostCreationNegativeCase(
       readFormat: String,
       writeFormat: String,
       trigger: Trigger,
-      errorMsg: String) = {
+      errorClass: String,
+      parameters: Map[String, String]) = {
     val query = spark.readStream
       .format(readFormat)
       .load()
@@ -346,7 +337,11 @@ class StreamingDataSourceV2Suite extends StreamTest {
     eventually(timeout(streamingTimeout)) {
       assert(query.exception.isDefined)
       assert(query.exception.get.cause != null)
-      assert(query.exception.get.cause.getMessage.contains(errorMsg))
+      checkErrorMatchPVals(
+        exception = 
query.exception.get.cause.asInstanceOf[SparkUnsupportedOperationException],
+        errorClass = errorClass,
+        parameters = parameters
+      )
     }
   }
 
@@ -437,32 +432,59 @@ class StreamingDataSourceV2Suite extends StreamTest {
       trigger match {
         // Invalid - can't read at all
         case _ if !sourceTable.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) 
=>
-          testNegativeCase(read, write, trigger,
-            s"Data source $read does not support streamed reading")
+          checkError(
+            exception = intercept[SparkUnsupportedOperationException] {
+              testCase(read, write, trigger)
+            },
+            errorClass = "_LEGACY_ERROR_TEMP_2049",
+            parameters = Map(
+              "className" -> "fake-read-neither-mode",
+              "operator" -> "reading"
+            )
+          )
 
         // Invalid - can't write
         case _ if !sinkTable.supports(STREAMING_WRITE) =>
-          testNegativeCase(read, write, trigger,
-            s"Data source $write does not support streamed writing")
+          checkError(
+            exception = intercept[SparkUnsupportedOperationException] {
+              testCase(read, write, trigger)
+            },
+            errorClass = "_LEGACY_ERROR_TEMP_2049",
+            parameters = Map(
+              "className" -> "fake-write-neither-mode",
+              "operator" -> "writing"
+            )
+          )
 
         case _: ContinuousTrigger =>
           if (sourceTable.supports(CONTINUOUS_READ)) {
             // Valid microbatch queries.
-            testPositiveCase(read, write, trigger)
+            testCase(read, write, trigger)
           } else {
             // Invalid - trigger is continuous but reader is not
-            testNegativeCase(
-              read, write, trigger, s"Data source $read does not support 
continuous processing")
+            checkError(
+              exception = intercept[SparkUnsupportedOperationException] {
+                testCase(read, write, trigger)
+              },
+              errorClass = "_LEGACY_ERROR_TEMP_2253",
+              parameters = Map("sourceName" -> "fake-read-microbatch-only")
+            )
           }
 
         case microBatchTrigger =>
           if (sourceTable.supports(MICRO_BATCH_READ)) {
             // Valid continuous queries.
-            testPositiveCase(read, write, trigger)
+            testCase(read, write, trigger)
           } else {
             // Invalid - trigger is microbatch but reader is not
             testPostCreationNegativeCase(read, write, trigger,
-              s"Data source $read does not support microbatch processing")
+              errorClass = "_LEGACY_ERROR_TEMP_2209",
+              parameters = Map(
+                "srcName" -> read,
+                "disabledSources" -> "",
+                "table" -> ".*"
+              )
+            )
           }
       }
     }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0f1a32d30b8..cde0da67e83 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -566,16 +566,18 @@ class MetastoreDataSourcesSuite extends QueryTest
   }
 
   test("path required error") {
-    assert(
-      intercept[AnalysisException] {
+    checkError(
+      exception = intercept[AnalysisException] {
         sparkSession.catalog.createTable(
           "createdJsonTable",
           "org.apache.spark.sql.json",
           Map.empty[String, String])
 
         table("createdJsonTable")
-      }.getMessage.contains("Unable to infer schema"),
-      "We should complain that path is not specified.")
+      },
+      errorClass = "UNABLE_TO_INFER_SCHEMA",
+      parameters = Map("format" -> "JSON")
+    )
 
     sql("DROP TABLE IF EXISTS createdJsonTable")
   }
@@ -918,33 +920,51 @@ class MetastoreDataSourcesSuite extends QueryTest
 
     withTable("appendOrcToParquet") {
       createDF(0, 9).write.format("parquet").saveAsTable("appendOrcToParquet")
-      val e = intercept[AnalysisException] {
-        createDF(10, 
19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
-      }
-      assert(e.getMessage.contains("The format of the existing table " +
-        s"$SESSION_CATALOG_NAME.default.appendorctoparquet is `Parquet"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          createDF(10, 19).write.mode(SaveMode.Append).format("orc").
+            saveAsTable("appendOrcToParquet")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1159",
+        parameters = Map(
+          "tableName" -> s"$SESSION_CATALOG_NAME.default.appendorctoparquet",
+          "existingProvider" -> "ParquetDataSourceV2",
+          "specifiedProvider" -> "OrcDataSourceV2"
+        )
+      )
     }
 
     withTable("appendParquetToJson") {
       createDF(0, 9).write.format("json").saveAsTable("appendParquetToJson")
-      val msg = intercept[AnalysisException] {
-        createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
-          .saveAsTable("appendParquetToJson")
-      }.getMessage
-
-      assert(msg.contains("The format of the existing table " +
-        s"$SESSION_CATALOG_NAME.default.appendparquettojson is `Json"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          createDF(10, 19).write.mode(SaveMode.Append).format("parquet")
+            .saveAsTable("appendParquetToJson")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1159",
+        parameters = Map(
+          "tableName" -> s"$SESSION_CATALOG_NAME.default.appendparquettojson",
+          "existingProvider" -> "JsonDataSourceV2",
+          "specifiedProvider" -> "ParquetDataSourceV2"
+        )
+      )
     }
 
     withTable("appendTextToJson") {
       createDF(0, 9).write.format("json").saveAsTable("appendTextToJson")
-      val msg = intercept[AnalysisException] {
-        createDF(10, 19).write.mode(SaveMode.Append).format("text")
-          .saveAsTable("appendTextToJson")
-      }.getMessage
-      // The format of the existing table can be JsonDataSourceV2 or 
JsonFileFormat.
-      assert(msg.contains("The format of the existing table " +
-        s"$SESSION_CATALOG_NAME.default.appendtexttojson is `Json"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          createDF(10, 19).write.mode(SaveMode.Append).format("text")
+            .saveAsTable("appendTextToJson")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1159",
+        // The format of the existing table can be JsonDataSourceV2 or 
JsonFileFormat.
+        parameters = Map(
+          "tableName" -> s"$SESSION_CATALOG_NAME.default.appendtexttojson",
+          "existingProvider" -> "JsonDataSourceV2",
+          "specifiedProvider" -> "TextDataSourceV2"
+        )
+      )
     }
   }
 
@@ -1236,16 +1256,18 @@ class MetastoreDataSourcesSuite extends QueryTest
   test("create a temp view using hive") {
     val tableName = "tab1"
     withTempView(tableName) {
-      val e = intercept[AnalysisException] {
-        sql(
-          s"""
-             |CREATE TEMPORARY VIEW $tableName
-             |(col1 int)
-             |USING hive
-           """.stripMargin)
-      }.getMessage
-      assert(e.contains("Hive data source can only be used with tables, you 
can't use it with " +
-        "CREATE TEMP VIEW USING"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(
+            s"""
+               |CREATE TEMPORARY VIEW $tableName
+               |(col1 int)
+               |USING hive
+             """.stripMargin)
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1293",
+        parameters = Map.empty
+      )
     }
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala
index c81c4649107..e2417219467 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala
@@ -209,17 +209,26 @@ class HiveSQLViewSuite extends SQLViewSuite with 
TestHiveSingleton {
            """.stripMargin
         )
 
-        val cause = intercept[AnalysisException] {
-          sql("SHOW CREATE TABLE v1")
-        }
-
-        assert(cause.getMessage.contains(" - partitioned view"))
-
-        val causeForSpark = intercept[AnalysisException] {
-          sql("SHOW CREATE TABLE v1 AS SERDE")
-        }
-
-        assert(causeForSpark.getMessage.contains(" - partitioned view"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("SHOW CREATE TABLE v1")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1271",
+          parameters = Map(
+            "unsupportedFeatures" -> " - partitioned view",
+            "table" -> s"`$SESSION_CATALOG_NAME`.`default`.`v1`"
+          )
+        )
+        checkError(
+          exception = intercept[AnalysisException] {
+            sql("SHOW CREATE TABLE v1 AS SERDE")
+          },
+          errorClass = "_LEGACY_ERROR_TEMP_1275",
+          parameters = Map(
+            "table" -> s"`$SESSION_CATALOG_NAME`.`default`.`v1`",
+            "features" -> " - partitioned view"
+          )
+        )
       }
     }
   }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala
index 49f650fb1c5..1dbe405b217 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterNamespaceSetLocationSuite.scala
@@ -32,10 +32,13 @@ class AlterNamespaceSetLocationSuite extends 
v1.AlterNamespaceSetLocationSuiteBa
     val ns = s"$catalog.$namespace"
     withNamespace(ns) {
       sql(s"CREATE NAMESPACE $ns")
-      val e = intercept[AnalysisException] {
-        sql(s"ALTER DATABASE $ns SET LOCATION 'loc'")
-      }
-      assert(e.getMessage.contains("does not support altering database 
location"))
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql(s"ALTER DATABASE $ns SET LOCATION 'loc'")
+        },
+        errorClass = "_LEGACY_ERROR_TEMP_1219",
+        parameters = Map.empty
+      )
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to