This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new efed39516c0 [SPARK-42309][SQL] Introduce `INCOMPATIBLE_DATA_TO_TABLE` 
and sub classes
efed39516c0 is described below

commit efed39516c0c4e9654aec447ce91676026368384
Author: itholic <haejoon....@databricks.com>
AuthorDate: Thu Jul 13 17:21:29 2023 +0300

    [SPARK-42309][SQL] Introduce `INCOMPATIBLE_DATA_TO_TABLE` and sub classes
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to assign name to _LEGACY_ERROR_TEMP_1204, 
"INCOMPATIBLE_DATA_TO_TABLE" and its sub classes:
    - CANNOT_FIND_DATA
    - AMBIGUOUS_COLUMN_NAME
    - EXTRA_STRUCT_FIELDS
    - NULLABLE_COLUMN
    - NULLABLE_ARRAY_ELEMENTS
    - NULLABLE_MAP_VALUES
    - CANNOT_SAFELY_CAST
    - STRUCT_MISSING_FIELDS
    - UNEXPECTED_COLUMN_NAME
    
    ### Why are the changes needed?
    
    We should assign proper name to _LEGACY_ERROR_TEMP_*
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    `./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`
    
    Closes #39937 from itholic/LEGACY_1204.
    
    Authored-by: itholic <haejoon....@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 common/utils/src/main/resources/error/README.md    |  14 +
 .../src/main/resources/error/error-classes.json    |  59 ++-
 docs/_data/menu-sql.yaml                           |   2 +-
 ...ions-incompatible-data-for-table-error-class.md |  64 +++
 ...tions-incompatible-data-to-table-error-class.md |  64 +++
 docs/sql-error-conditions.md                       |   8 +
 docs/sql-ref-ansi-compliance.md                    |   3 +-
 .../sql/catalyst/analysis/AssignmentUtils.scala    |   5 +-
 .../catalyst/analysis/TableOutputResolver.scala    |  97 +++--
 .../spark/sql/catalyst/types/DataTypeUtils.scala   |  59 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  | 110 +++++-
 .../catalyst/analysis/V2WriteAnalysisSuite.scala   | 267 ++++++++++---
 .../types/DataTypeWriteCompatibilitySuite.scala    | 429 ++++++++++++---------
 .../apache/spark/sql/DataFrameWriterV2Suite.scala  |  39 +-
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |   5 +-
 .../command/AlignMergeAssignmentsSuite.scala       |  78 +++-
 .../command/AlignUpdateAssignmentsSuite.scala      |  54 ++-
 .../org/apache/spark/sql/sources/InsertSuite.scala |  98 +++--
 .../sql/test/DataFrameReaderWriterSuite.scala      |  47 ++-
 .../spark/sql/hive/client/HiveClientSuite.scala    |  22 +-
 20 files changed, 1100 insertions(+), 424 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index 838991c2b6a..dfcb42d49e7 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1294,6 +1294,20 @@ The following SQLSTATEs are collated from:
 |IM013    |IM   |ODBC driver                                       |013     
|Trace file error                                            |SQL Server     |N 
      |SQL Server                                                               
   |
 |IM014    |IM   |ODBC driver                                       |014     
|Invalid name of File DSN                                    |SQL Server     |N 
      |SQL Server                                                               
   |
 |IM015    |IM   |ODBC driver                                       |015     
|Corrupt file data source                                    |SQL Server     |N 
      |SQL Server                                                               
   |
+|KD000   |KD   |datasource specific errors                            |000     
    |datasource specific errors                                  |Databricks    
 |N           |Databricks                                                       
           |
+|KD001   |KD   |datasource specific errors                            |001     
    |Cannot read file footer                                     |Databricks    
 |N           |Databricks                                                       
           |
+|KD002   |KD   |datasource specific errors                            |002     
    |Unexpected version                                          |Databricks    
 |N           |Databricks                                                       
           |
+|KD003   |KD   |datasource specific errors                            |003     
    |Incorrect access to data type                                   
|Databricks         |N           |Databricks                                    
                              |
+|KD004   |KD   |datasource specific errors                            |004     
    |Delta protocol version error                                    
|Databricks         |N           |Databricks                                    
                              |
+|KD005   |KD   |datasource specific errors                            |005     
    |Table must include at least one non partition column            
|Databricks         |N           |Databricks                                    
                              |
+|KD006   |KD   |datasource specific errors                            |006     
    |No commits found at log path                                    
|Databricks         |N           |Databricks                                    
                              |
+|KD007   |KD   |datasource specific errors                            |007     
    |Table signature changed                                     |Databricks    
 |N           |Databricks                                                       
           |
+|KD008   |KD   |datasource specific errors                            |008     
    |Table signature not set                                     |Databricks    
 |N           |Databricks                                                       
           |
+|KD009   |KD   |datasource specific errors                            |009     
    |Partitions do not match                                     |Databricks    
 |N           |Databricks                                                       
           |
+|KD00A   |KD   |datasource specific errors                            |00A     
    |Unexpected partial scan                                     |Databricks    
 |N           |Databricks                                                       
           |
+|KD00B   |KD   |datasource specific errors                            |00B     
    |Unrecognised file                                           |Databricks    
 |N           |Databricks                                                       
           |
+|KD00C   |KD   |datasource specific errors                            |00C     
    |Versioning not contiguous                                   |Databricks    
 |N           |Databricks                                                       
           |
+|KD00D   |KD   |datasource specific errors                            |00D     
    |Stats required                                                  
|Databricks         |N           |Databricks                                    
                              |
 |P0000    |P0   |PL/pgSQL Error                                    |000     
|plpgsql_error                                               |PostgreSQL     |N 
      |PostgreSQL Redshift                                                      
   |
 |P0001    |P0   |PL/pgSQL Error                                    |001     
|raise_exception                                             |PostgreSQL     |N 
      |PostgreSQL Redshift                                                      
   |
 |P0002    |P0   |PL/pgSQL Error                                    |002     
|no_data_found                                               |PostgreSQL     |N 
      |PostgreSQL Redshift                                                      
   |
diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 2c4d2b533a6..6691e86b463 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -904,6 +904,59 @@
       "Detected an incompatible DataSourceRegister. Please remove the 
incompatible library from classpath or upgrade it. Error: <message>"
     ]
   },
+  "INCOMPATIBLE_DATA_FOR_TABLE" : {
+    "message" : [
+      "Cannot write incompatible data for the table <tableName>:"
+    ],
+    "subClass" : {
+      "AMBIGUOUS_COLUMN_NAME" : {
+        "message" : [
+          "Ambiguous column name in the input data <colName>."
+        ]
+      },
+      "CANNOT_FIND_DATA" : {
+        "message" : [
+          "Cannot find data for the output column <colName>."
+        ]
+      },
+      "CANNOT_SAFELY_CAST" : {
+        "message" : [
+          "Cannot safely cast <colName> <srcType> to <targetType>."
+        ]
+      },
+      "EXTRA_STRUCT_FIELDS" : {
+        "message" : [
+          "Cannot write extra fields <extraFields> to the struct <colName>."
+        ]
+      },
+      "NULLABLE_ARRAY_ELEMENTS" : {
+        "message" : [
+          "Cannot write nullable elements to array of non-nulls: <colName>."
+        ]
+      },
+      "NULLABLE_COLUMN" : {
+        "message" : [
+          "Cannot write nullable values to non-null column <colName>."
+        ]
+      },
+      "NULLABLE_MAP_VALUES" : {
+        "message" : [
+          "Cannot write nullable values to map of non-nulls: <colName>."
+        ]
+      },
+      "STRUCT_MISSING_FIELDS" : {
+        "message" : [
+          "Struct <colName> missing fields: <missingFields>."
+        ]
+      },
+      "UNEXPECTED_COLUMN_NAME" : {
+        "message" : [
+          "Struct <colName> <order>-th field name does not match (may be out 
of order): expected <expected>, found <found>."
+        ]
+      }
+    },
+    "sqlState" : "KD000"
+  },
   "INCOMPATIBLE_JOIN_TYPES" : {
     "message" : [
       "The join types <joinType1> and <joinType2> are incompatible."
@@ -4051,12 +4104,6 @@
       "Cannot resolve column name \"<colName>\" among (<fieldNames>)."
     ]
   },
-  "_LEGACY_ERROR_TEMP_1204" : {
-    "message" : [
-      "Cannot write incompatible data to table '<tableName>':",
-      "- <errors>."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_1205" : {
     "message" : [
       "Expected only partition pruning predicates: 
<nonPartitionPruningPredicates>."
diff --git a/docs/_data/menu-sql.yaml b/docs/_data/menu-sql.yaml
index 2345df89e51..62ad6a3a585 100644
--- a/docs/_data/menu-sql.yaml
+++ b/docs/_data/menu-sql.yaml
@@ -111,7 +111,7 @@
       url: sql-error-conditions-connect-error-class.html
     - text: DATATYPE_MISMATCH error class
       url: sql-error-conditions-datatype-mismatch-error-class.html
-    - text: INCOMPATIBLE_DATA_TO_TABLE error class
+    - text: INCOMPATIBLE_DATA_FOR_TABLE error class
       url: sql-error-conditions-incompatible-data-to-table-error-class.html
     - text: INCOMPLETE_TYPE_DEFINITION error class
       url: sql-error-conditions-incomplete-type-definition-error-class.html
diff --git 
a/docs/sql-error-conditions-incompatible-data-for-table-error-class.md 
b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md
new file mode 100644
index 00000000000..f70b69ba6c5
--- /dev/null
+++ b/docs/sql-error-conditions-incompatible-data-for-table-error-class.md
@@ -0,0 +1,64 @@
+---
+layout: global
+title: INCOMPATIBLE_DATA_FOR_TABLE error class
+displayTitle: INCOMPATIBLE_DATA_FOR_TABLE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+SQLSTATE: KD000
+
+Cannot write incompatible data for the table `<tableName>`:
+
+This error class has the following derived error classes:
+
+## AMBIGUOUS_COLUMN_NAME
+
+Ambiguous column name in the input data `<colName>`.
+
+## CANNOT_FIND_DATA
+
+Cannot find data for the output column `<colName>`.
+
+## CANNOT_SAFELY_CAST
+
+Cannot safely cast `<colName>` `<srcType>` to `<targetType>`.
+
+## EXTRA_STRUCT_FIELDS
+
+Cannot write extra fields `<extraFields>` to the struct `<colName>`.
+
+## NULLABLE_ARRAY_ELEMENTS
+
+Cannot write nullable elements to array of non-nulls: `<colName>`.
+
+## NULLABLE_COLUMN
+
+Cannot write nullable values to non-null column `<colName>`.
+
+## NULLABLE_MAP_VALUES
+
+Cannot write nullable values to map of non-nulls: `<colName>`.
+
+## STRUCT_MISSING_FIELDS
+
+Struct `<colName>` missing fields: `<missingFields>`.
+
+## UNEXPECTED_COLUMN_NAME
+
+Struct `<colName>` `<order>`-th field name does not match (may be out of 
order): expected `<expected>`, found `<found>`.
+
+
diff --git 
a/docs/sql-error-conditions-incompatible-data-to-table-error-class.md 
b/docs/sql-error-conditions-incompatible-data-to-table-error-class.md
new file mode 100644
index 00000000000..180a9cb9e49
--- /dev/null
+++ b/docs/sql-error-conditions-incompatible-data-to-table-error-class.md
@@ -0,0 +1,64 @@
+---
+layout: global
+title: INCOMPATIBLE_DATA_FOR_TABLE error class
+displayTitle: INCOMPATIBLE_DATA_FOR_TABLE error class
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+SQLSTATE: none assigned
+
+Cannot write incompatible data for table `<tableName>`:
+
+This error class has the following derived error classes:
+
+## AMBIGUOUS_COLUMN_NAME
+
+Ambiguous column name in the input data `<colName>`.
+
+## CANNOT_FIND_DATA
+
+Cannot find data for the output column `<colName>`.
+
+## CANNOT_SAFELY_CAST
+
+Cannot safely cast `<colName>` `<srcType>` to `<targetType>`.
+
+## EXTRA_STRUCT_FIELDS
+
+Cannot write extra fields `<extraFields>` to the struct `<colName>`.
+
+## NULLABLE_ARRAY_ELEMENTS
+
+Cannot write nullable elements to array of non-nulls: `<colName>`.
+
+## NULLABLE_COLUMN
+
+Cannot write nullable values to non-null column `<colName>`.
+
+## NULLABLE_MAP_VALUES
+
+Cannot write nullable values to map of non-nulls: `<colName>`.
+
+## STRUCT_MISSING_FIELDS
+
+Struct `<colName>` missing fields: `<missingFields>`.
+
+## UNEXPECTED_COLUMN_NAME
+
+Struct `<colName>` `<order>`-th field name does not match (may be out of 
order): expected `<expected>`, found `<found>`.
+
+
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 88bf68dd18d..21eda114d06 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -636,6 +636,14 @@ SQLSTATE: none assigned
 
 Detected an incompatible DataSourceRegister. Please remove the incompatible 
library from classpath or upgrade it. Error: `<message>`
 
+### 
[INCOMPATIBLE_DATA_FOR_TABLE](sql-error-conditions-incompatible-data-for-table-error-class.html)
+
+SQLSTATE: KD000
+
+Cannot write incompatible data for the table `<tableName>`:
+
+For more details see 
[INCOMPATIBLE_DATA_FOR_TABLE](sql-error-conditions-incompatible-data-for-table-error-class.html)
+
 ### INCOMPATIBLE_JOIN_TYPES
 
 [SQLSTATE: 
42613](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index f9c6f5ea6aa..66258f7a8cd 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -152,8 +152,7 @@ CREATE TABLE t (v INT);
 
 -- `spark.sql.storeAssignmentPolicy=ANSI`
 INSERT INTO t VALUES ('1');
-org.apache.spark.sql.AnalysisException: Cannot write incompatible data to 
table '`default`.`t`':
-- Cannot safely cast 'v': string to int;
+org.apache.spark.sql.AnalysisException: 
[INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST] Cannot write incompatible data 
for table `spark_catalog`.`default`.`t`: Cannot safely cast `v`: "STRING" to 
"INT".
 
 -- `spark.sql.storeAssignmentPolicy=LEGACY` (This is a legacy behaviour until 
Spark 2.x)
 INSERT INTO t VALUES ('1');
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
index c9ee68a0dc7..ebc30ae4a6e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala
@@ -119,7 +119,8 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
         val colPath = Seq(attr.name)
         val actualAttr = restoreActualType(attr)
         val value = matchingAssignments.head.value
-        TableOutputResolver.resolveUpdate(value, actualAttr, conf, err => 
errors += err, colPath)
+        TableOutputResolver.resolveUpdate(
+          "", value, actualAttr, conf, err => errors += err, colPath)
       }
       Assignment(attr, resolvedValue)
     }
@@ -163,7 +164,7 @@ object AssignmentUtils extends SQLConfHelper with 
CastSupport {
       TableOutputResolver.checkNullability(colExpr, col, conf, colPath)
     } else if (exactAssignments.nonEmpty) {
       val value = exactAssignments.head.value
-      TableOutputResolver.resolveUpdate(value, col, conf, addError, colPath)
+      TableOutputResolver.resolveUpdate("", value, col, conf, addError, 
colPath)
     } else {
       applyFieldAssignments(col, colExpr, fieldAssignments, addError, colPath)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 9c94437dbc7..894cd0b3991 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getDefaultValueExprOrNullLit
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -57,6 +58,7 @@ object TableOutputResolver {
       // the column's default value. We need to pass `fillDefaultValue` as 
true here, if the
       // `supportColDefaultValue` parameter is also true.
       reorderColumnsByName(
+        tableName,
         query.output,
         actualExpectedCols,
         conf,
@@ -78,11 +80,11 @@ object TableOutputResolver {
           tableName, actualExpectedCols.map(_.name), query)
       }
 
-      resolveColumnsByPosition(queryOutputCols, actualExpectedCols, conf, 
errors += _)
+      resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, 
conf, errors += _)
     }
 
     if (errors.nonEmpty) {
-      throw 
QueryCompilationErrors.cannotWriteIncompatibleDataToTableError(tableName, 
errors.toSeq)
+      resolveColumnsByPosition(tableName, query.output, actualExpectedCols, 
conf, errors += _)
     }
 
     if (resolved == query.output) {
@@ -93,6 +95,7 @@ object TableOutputResolver {
   }
 
   def resolveUpdate(
+      tableName: String,
       value: Expression,
       col: Attribute,
       conf: SQLConf,
@@ -102,29 +105,31 @@ object TableOutputResolver {
     (value.dataType, col.dataType) match {
       // no need to reorder inner fields or cast if types are already 
compatible
       case (valueType, colType) if 
DataType.equalsIgnoreCompatibleNullability(valueType, colType) =>
-        val canWriteExpr = canWrite(valueType, colType, byName = true, conf, 
addError, colPath)
+        val canWriteExpr = canWrite(
+          tableName, valueType, colType, byName = true, conf, addError, 
colPath)
         if (canWriteExpr) checkNullability(value, col, conf, colPath) else 
value
       case (valueType: StructType, colType: StructType) =>
         val resolvedValue = resolveStructType(
-          value, valueType, col, colType,
+          tableName, value, valueType, col, colType,
           byName = true, conf, addError, colPath)
         resolvedValue.getOrElse(value)
       case (valueType: ArrayType, colType: ArrayType) =>
         val resolvedValue = resolveArrayType(
-          value, valueType, col, colType,
+          tableName, value, valueType, col, colType,
           byName = true, conf, addError, colPath)
         resolvedValue.getOrElse(value)
       case (valueType: MapType, colType: MapType) =>
         val resolvedValue = resolveMapType(
-          value, valueType, col, colType,
+          tableName, value, valueType, col, colType,
           byName = true, conf, addError, colPath)
         resolvedValue.getOrElse(value)
       case _ =>
-        checkUpdate(value, col, conf, addError, colPath)
+        checkUpdate(tableName, value, col, conf, addError, colPath)
     }
   }
 
   private def checkUpdate(
+      tableName: String,
       value: Expression,
       attr: Attribute,
       conf: SQLConf,
@@ -139,7 +144,7 @@ object TableOutputResolver {
     }
 
     val canWriteValue = canWrite(
-      value.dataType, attrTypeWithoutCharVarchar,
+      tableName, value.dataType, attrTypeWithoutCharVarchar,
       byName = true, conf, addError, colPath)
 
     if (canWriteValue) {
@@ -157,6 +162,7 @@ object TableOutputResolver {
   }
 
   private def canWrite(
+      tableName: String,
       valueType: DataType,
       expectedType: DataType,
       byName: Boolean,
@@ -166,7 +172,7 @@ object TableOutputResolver {
     conf.storeAssignmentPolicy match {
       case StoreAssignmentPolicy.STRICT | StoreAssignmentPolicy.ANSI =>
         DataTypeUtils.canWrite(
-          valueType, expectedType, byName, conf.resolver, colPath.quoted,
+          tableName, valueType, expectedType, byName, conf.resolver, 
colPath.quoted,
           conf.storeAssignmentPolicy, addError)
       case _ =>
         true
@@ -174,6 +180,7 @@ object TableOutputResolver {
   }
 
   private def reorderColumnsByName(
+      tableName: String,
       inputCols: Seq[NamedExpression],
       expectedCols: Seq[Attribute],
       conf: SQLConf,
@@ -191,12 +198,15 @@ object TableOutputResolver {
           None
         }
         if (defaultExpr.isEmpty) {
-          addError(s"Cannot find data for output column 
'${newColPath.quoted}'")
+          throw 
QueryCompilationErrors.incompatibleDataToTableCannotFindDataError(
+            tableName, newColPath.quoted
+          )
         }
         defaultExpr
       } else if (matched.length > 1) {
-        addError(s"Ambiguous column name in the input data: 
'${newColPath.quoted}'")
-        None
+        throw 
QueryCompilationErrors.incompatibleDataToTableAmbiguousColumnNameError(
+          tableName, newColPath.quoted
+        )
       } else {
         matchedCols += matched.head.name
         val expectedName = expectedCol.name
@@ -209,18 +219,19 @@ object TableOutputResolver {
         (matchedCol.dataType, expectedCol.dataType) match {
           case (matchedType: StructType, expectedType: StructType) =>
             resolveStructType(
-              matchedCol, matchedType, expectedCol, expectedType,
+              tableName, matchedCol, matchedType, expectedCol, expectedType,
               byName = true, conf, addError, newColPath)
           case (matchedType: ArrayType, expectedType: ArrayType) =>
             resolveArrayType(
-              matchedCol, matchedType, expectedCol, expectedType,
+              tableName, matchedCol, matchedType, expectedCol, expectedType,
               byName = true, conf, addError, newColPath)
           case (matchedType: MapType, expectedType: MapType) =>
             resolveMapType(
-              matchedCol, matchedType, expectedCol, expectedType,
+              tableName, matchedCol, matchedType, expectedCol, expectedType,
               byName = true, conf, addError, newColPath)
           case _ =>
-            checkField(expectedCol, matchedCol, byName = true, conf, addError, 
newColPath)
+            checkField(
+              tableName, expectedCol, matchedCol, byName = true, conf, 
addError, newColPath)
         }
       }
     }
@@ -228,9 +239,10 @@ object TableOutputResolver {
     if (reordered.length == expectedCols.length) {
       if (matchedCols.size < inputCols.length) {
         val extraCols = inputCols.filterNot(col => 
matchedCols.contains(col.name))
-          .map(col => s"'${col.name}'").mkString(", ")
-        addError(s"Cannot write extra fields to struct '${colPath.quoted}': 
$extraCols")
-        Nil
+          .map(col => s"${toSQLId(col.name)}").mkString(", ")
+        throw 
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
+          tableName, colPath.quoted, extraCols
+        )
       } else {
         reordered
       }
@@ -240,6 +252,7 @@ object TableOutputResolver {
   }
 
   private def resolveColumnsByPosition(
+      tableName: String,
       inputCols: Seq[NamedExpression],
       expectedCols: Seq[Attribute],
       conf: SQLConf,
@@ -248,16 +261,18 @@ object TableOutputResolver {
 
     if (inputCols.size > expectedCols.size) {
       val extraColsStr = inputCols.takeRight(inputCols.size - 
expectedCols.size)
-        .map(col => s"'${col.name}'")
+        .map(col => toSQLId(col.name))
         .mkString(", ")
-      addError(s"Cannot write extra fields to struct '${colPath.quoted}': 
$extraColsStr")
-      return Nil
+      throw 
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
+        tableName, colPath.quoted, extraColsStr
+      )
     } else if (inputCols.size < expectedCols.size) {
       val missingColsStr = expectedCols.takeRight(expectedCols.size - 
inputCols.size)
-        .map(col => s"'${col.name}'")
+        .map(col => toSQLId(col.name))
         .mkString(", ")
-      addError(s"Struct '${colPath.quoted}' missing fields: $missingColsStr")
-      return Nil
+      throw 
QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
+        tableName, colPath.quoted, missingColsStr
+      )
     }
 
     inputCols.zip(expectedCols).flatMap { case (inputCol, expectedCol) =>
@@ -265,18 +280,18 @@ object TableOutputResolver {
       (inputCol.dataType, expectedCol.dataType) match {
         case (inputType: StructType, expectedType: StructType) =>
           resolveStructType(
-            inputCol, inputType, expectedCol, expectedType,
+            tableName, inputCol, inputType, expectedCol, expectedType,
             byName = false, conf, addError, newColPath)
         case (inputType: ArrayType, expectedType: ArrayType) =>
           resolveArrayType(
-            inputCol, inputType, expectedCol, expectedType,
+            tableName, inputCol, inputType, expectedCol, expectedType,
             byName = false, conf, addError, newColPath)
         case (inputType: MapType, expectedType: MapType) =>
           resolveMapType(
-            inputCol, inputType, expectedCol, expectedType,
+            tableName, inputCol, inputType, expectedCol, expectedType,
             byName = false, conf, addError, newColPath)
         case _ =>
-          checkField(expectedCol, inputCol, byName = false, conf, addError, 
newColPath)
+          checkField(tableName, expectedCol, inputCol, byName = false, conf, 
addError, newColPath)
       }
     }
   }
@@ -301,6 +316,7 @@ object TableOutputResolver {
   }
 
   private def resolveStructType(
+      tableName: String,
       input: Expression,
       inputType: StructType,
       expected: Attribute,
@@ -314,9 +330,10 @@ object TableOutputResolver {
       Alias(GetStructField(nullCheckedInput, i, Some(f.name)), f.name)()
     }
     val resolved = if (byName) {
-      reorderColumnsByName(fields, toAttributes(expectedType), conf, addError, 
colPath)
+      reorderColumnsByName(tableName, fields, toAttributes(expectedType), 
conf, addError, colPath)
     } else {
-      resolveColumnsByPosition(fields, toAttributes(expectedType), conf, 
addError, colPath)
+      resolveColumnsByPosition(
+        tableName, fields, toAttributes(expectedType), conf, addError, colPath)
     }
     if (resolved.length == expectedType.length) {
       val struct = CreateStruct(resolved)
@@ -332,6 +349,7 @@ object TableOutputResolver {
   }
 
   private def resolveArrayType(
+      tableName: String,
       input: Expression,
       inputType: ArrayType,
       expected: Attribute,
@@ -345,9 +363,9 @@ object TableOutputResolver {
     val fakeAttr =
       AttributeReference("element", expectedType.elementType, 
expectedType.containsNull)()
     val res = if (byName) {
-      reorderColumnsByName(Seq(param), Seq(fakeAttr), conf, addError, colPath)
+      reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath)
     } else {
-      resolveColumnsByPosition(Seq(param), Seq(fakeAttr), conf, addError, 
colPath)
+      resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, 
addError, colPath)
     }
     if (res.length == 1) {
       val func = LambdaFunction(res.head, Seq(param))
@@ -358,6 +376,7 @@ object TableOutputResolver {
   }
 
   private def resolveMapType(
+      tableName: String,
       input: Expression,
       inputType: MapType,
       expected: Attribute,
@@ -371,9 +390,9 @@ object TableOutputResolver {
     val keyParam = NamedLambdaVariable("key", inputType.keyType, nullable = 
false)
     val fakeKeyAttr = AttributeReference("key", expectedType.keyType, nullable 
= false)()
     val resKey = if (byName) {
-      reorderColumnsByName(Seq(keyParam), Seq(fakeKeyAttr), conf, addError, 
colPath)
+      reorderColumnsByName(tableName, Seq(keyParam), Seq(fakeKeyAttr), conf, 
addError, colPath)
     } else {
-      resolveColumnsByPosition(Seq(keyParam), Seq(fakeKeyAttr), conf, 
addError, colPath)
+      resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), 
conf, addError, colPath)
     }
 
     val valueParam =
@@ -381,9 +400,10 @@ object TableOutputResolver {
     val fakeValueAttr =
       AttributeReference("value", expectedType.valueType, 
expectedType.valueContainsNull)()
     val resValue = if (byName) {
-      reorderColumnsByName(Seq(valueParam), Seq(fakeValueAttr), conf, 
addError, colPath)
+      reorderColumnsByName(tableName, Seq(valueParam), Seq(fakeValueAttr), 
conf, addError, colPath)
     } else {
-      resolveColumnsByPosition(Seq(valueParam), Seq(fakeValueAttr), conf, 
addError, colPath)
+      resolveColumnsByPosition(
+        tableName, Seq(valueParam), Seq(fakeValueAttr), conf, addError, 
colPath)
     }
 
     if (resKey.length == 1 && resValue.length == 1) {
@@ -430,6 +450,7 @@ object TableOutputResolver {
   }
 
   private def checkField(
+      tableName: String,
       tableAttr: Attribute,
       queryExpr: NamedExpression,
       byName: Boolean,
@@ -465,7 +486,7 @@ object TableOutputResolver {
     }
 
     val canWriteExpr = canWrite(
-      queryExpr.dataType, attrTypeWithoutCharVarchar,
+      tableName, queryExpr.dataType, attrTypeWithoutCharVarchar,
       byName, conf, addError, colPath)
 
     if (canWriteExpr) outputField else None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
index d0df8c3270e..feff781a403 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.types
 
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
Literal}
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, 
STRICT}
 import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, Decimal, 
DecimalType, MapType, NullType, StructField, StructType}
@@ -72,6 +74,7 @@ object DataTypeUtils {
    * @return true if data written with the write type can be read using the 
read type
    */
   def canWrite(
+      tableName: String,
       write: DataType,
       read: DataType,
       byName: Boolean,
@@ -83,12 +86,13 @@ object DataTypeUtils {
       case (wArr: ArrayType, rArr: ArrayType) =>
         // run compatibility check first to produce all error messages
         val typesCompatible = canWrite(
-          wArr.elementType, rArr.elementType, byName, resolver, context + 
".element",
+          tableName, wArr.elementType, rArr.elementType, byName, resolver, 
context + ".element",
           storeAssignmentPolicy, addError)
 
         if (wArr.containsNull && !rArr.containsNull) {
-          addError(s"Cannot write nullable elements to array of non-nulls: 
'$context'")
-          false
+          throw 
QueryCompilationErrors.incompatibleDataToTableNullableArrayElementsError(
+            tableName, context
+          )
         } else {
           typesCompatible
         }
@@ -99,15 +103,16 @@ object DataTypeUtils {
 
         // run compatibility check first to produce all error messages
         val keyCompatible = canWrite(
-          wMap.keyType, rMap.keyType, byName, resolver, context + ".key",
+          tableName, wMap.keyType, rMap.keyType, byName, resolver, context + 
".key",
           storeAssignmentPolicy, addError)
         val valueCompatible = canWrite(
-          wMap.valueType, rMap.valueType, byName, resolver, context + ".value",
+          tableName, wMap.valueType, rMap.valueType, byName, resolver, context 
+ ".value",
           storeAssignmentPolicy, addError)
 
         if (wMap.valueContainsNull && !rMap.valueContainsNull) {
-          addError(s"Cannot write nullable values to map of non-nulls: 
'$context'")
-          false
+          throw 
QueryCompilationErrors.incompatibleDataToTableNullableMapValuesError(
+            tableName, context
+          )
         } else {
           keyCompatible && valueCompatible
         }
@@ -119,16 +124,15 @@ object DataTypeUtils {
             val nameMatch = resolver(wField.name, rField.name) || 
isSparkGeneratedName(wField.name)
             val fieldContext = s"$context.${rField.name}"
             val typesCompatible = canWrite(
-              wField.dataType, rField.dataType, byName, resolver, fieldContext,
+              tableName, wField.dataType, rField.dataType, byName, resolver, 
fieldContext,
               storeAssignmentPolicy, addError)
 
             if (byName && !nameMatch) {
-              addError(s"Struct '$context' $i-th field name does not match " +
-                s"(may be out of order): expected '${rField.name}', found 
'${wField.name}'")
-              fieldCompatible = false
+              throw 
QueryCompilationErrors.incompatibleDataToTableUnexpectedColumnNameError(
+                tableName, context, i, rField.name, wField.name)
             } else if (!rField.nullable && wField.nullable) {
-              addError(s"Cannot write nullable values to non-null field: 
'$fieldContext'")
-              fieldCompatible = false
+              throw 
QueryCompilationErrors.incompatibleDataToTableNullableColumnError(
+                tableName, fieldContext)
             } else if (!typesCompatible) {
               // errors are added in the recursive call to canWrite above
               fieldCompatible = false
@@ -137,25 +141,27 @@ object DataTypeUtils {
 
         if (readFields.size > writeFields.size) {
           val missingFieldsStr = readFields.takeRight(readFields.size - 
writeFields.size)
-            .map(f => s"'${f.name}'").mkString(", ")
+            .map(f => s"${toSQLId(f.name)}").mkString(", ")
           if (missingFieldsStr.nonEmpty) {
-            addError(s"Struct '$context' missing fields: $missingFieldsStr")
-            fieldCompatible = false
+            throw 
QueryCompilationErrors.incompatibleDataToTableStructMissingFieldsError(
+              tableName, context, missingFieldsStr)
           }
 
         } else if (writeFields.size > readFields.size) {
           val extraFieldsStr = writeFields.takeRight(writeFields.size - 
readFields.size)
-            .map(f => s"'${f.name}'").mkString(", ")
-          addError(s"Cannot write extra fields to struct '$context': 
$extraFieldsStr")
-          fieldCompatible = false
+            .map(f => s"${toSQLId(f.name)}").mkString(", ")
+          throw 
QueryCompilationErrors.incompatibleDataToTableExtraStructFieldsError(
+            tableName, context, extraFieldsStr
+          )
         }
 
         fieldCompatible
 
       case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == STRICT =>
         if (!Cast.canUpCast(w, r)) {
-          addError(s"Cannot safely cast '$context': ${w.catalogString} to 
${r.catalogString}")
-          false
+          throw 
QueryCompilationErrors.incompatibleDataToTableCannotSafelyCastError(
+            tableName, context, w.catalogString, r.catalogString
+          )
         } else {
           true
         }
@@ -164,8 +170,9 @@ object DataTypeUtils {
 
       case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI =>
         if (!Cast.canANSIStoreAssign(w, r)) {
-          addError(s"Cannot safely cast '$context': ${w.catalogString} to 
${r.catalogString}")
-          false
+          throw 
QueryCompilationErrors.incompatibleDataToTableCannotSafelyCastError(
+            tableName, context, w.catalogString, r.catalogString
+          )
         } else {
           true
         }
@@ -174,9 +181,9 @@ object DataTypeUtils {
         true
 
       case (w, r) =>
-        addError(s"Cannot write '$context': " +
-          s"${w.catalogString} is incompatible with ${r.catalogString}")
-        false
+        throw 
QueryCompilationErrors.incompatibleDataToTableCannotSafelyCastError(
+          tableName, context, w.catalogString, r.catalogString
+        )
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index aab5ada6e4d..955046e74e1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2074,12 +2074,114 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
         "dataColumns" -> query.output.map(c => toSQLId(c.name)).mkString(", 
")))
   }
 
-  def cannotWriteIncompatibleDataToTableError(tableName: String, errors: 
Seq[String]): Throwable = {
+  def incompatibleDataToTableCannotFindDataError(
+      tableName: String, colName: String): Throwable = {
     new AnalysisException(
-      errorClass = "_LEGACY_ERROR_TEMP_1204",
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
       messageParameters = Map(
-        "tableName" -> tableName,
-        "errors" -> errors.mkString("\n- ")))
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName)
+      )
+    )
+  }
+
+  def incompatibleDataToTableAmbiguousColumnNameError(
+      tableName: String, colName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.AMBIGUOUS_COLUMN_NAME",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName)
+      )
+    )
+  }
+
+  def incompatibleDataToTableExtraStructFieldsError(
+      tableName: String, colName: String, extraFields: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName),
+        "extraFields" -> extraFields
+      )
+    )
+  }
+
+  def incompatibleDataToTableNullableColumnError(
+      tableName: String, colName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_COLUMN",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName)
+      )
+    )
+  }
+
+  def incompatibleDataToTableNullableArrayElementsError(
+      tableName: String, colName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_ARRAY_ELEMENTS",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName)
+      )
+    )
+  }
+
+  def incompatibleDataToTableNullableMapValuesError(
+      tableName: String, colName: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_MAP_VALUES",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName)
+      )
+    )
+  }
+
+  def incompatibleDataToTableCannotSafelyCastError(
+      tableName: String, colName: String, srcType: String, targetType: 
String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName),
+        "srcType" -> toSQLType(srcType),
+        "targetType" -> toSQLType(targetType)
+      )
+    )
+  }
+
+  def incompatibleDataToTableStructMissingFieldsError(
+      tableName: String, colName: String, missingFields: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName),
+        "missingFields" -> missingFields
+      )
+    )
+  }
+
+  def incompatibleDataToTableUnexpectedColumnNameError(
+     tableName: String,
+     colName: String,
+     order: Int,
+     expected: String,
+     found: String): Throwable = {
+    new AnalysisException(
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.UNEXPECTED_COLUMN_NAME",
+      messageParameters = Map(
+        "tableName" -> toSQLId(tableName),
+        "colName" -> toSQLId(colName),
+        "order" -> order.toString,
+        "expected" -> toSQLId(expected),
+        "found" -> toSQLId(found)
+      )
+    )
   }
 
   def invalidRowLevelOperationAssignments(
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index 0a97b130c80..d91a080d8fe 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
 
+import org.apache.spark.QueryContext
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Cast, CreateNamedStruct, GetStructField, If, IsNull, LessThanOrEqual, Literal}
@@ -153,6 +154,23 @@ abstract class V2ANSIWriteAnalysisSuiteBase extends 
V2WriteAnalysisSuiteBase {
       super.assertAnalysisError(inputPlan, expectedErrors, caseSensitive)
     }
   }
+
+  override def assertAnalysisErrorClass(
+      inputPlan: LogicalPlan,
+      expectedErrorClass: String,
+      expectedMessageParameters: Map[String, String],
+      queryContext: Array[QueryContext] = Array.empty,
+      caseSensitive: Boolean = true): Unit = {
+    withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
StoreAssignmentPolicy.ANSI.toString) {
+      super.assertAnalysisErrorClass(
+        inputPlan,
+        expectedErrorClass,
+        expectedMessageParameters,
+        queryContext,
+        caseSensitive
+      )
+    }
+  }
 }
 
 abstract class V2StrictWriteAnalysisSuiteBase extends V2WriteAnalysisSuiteBase 
{
@@ -174,13 +192,36 @@ abstract class V2StrictWriteAnalysisSuiteBase extends 
V2WriteAnalysisSuiteBase {
     }
   }
 
+  override def assertAnalysisErrorClass(
+      inputPlan: LogicalPlan,
+      expectedErrorClass: String,
+      expectedMessageParameters: Map[String, String],
+      queryContext: Array[QueryContext] = Array.empty,
+      caseSensitive: Boolean = true): Unit = {
+    withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
StoreAssignmentPolicy.STRICT.toString) {
+      super.assertAnalysisErrorClass(
+        inputPlan,
+        expectedErrorClass,
+        expectedMessageParameters,
+        queryContext,
+        caseSensitive
+      )
+    }
+  }
+
   test("byName: fail canWrite check") {
     val parsedPlan = byName(table, widerTable)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write", "'table-name'",
-      "Cannot safely cast", "'x'", "'y'", "double to float"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`x`",
+        "srcType" -> "\"DOUBLE\"",
+        "targetType" -> "\"FLOAT\"")
+    )
   }
 
   test("byName: multiple field errors are reported") {
@@ -195,10 +236,15 @@ abstract class V2StrictWriteAnalysisSuiteBase extends 
V2WriteAnalysisSuiteBase {
     val parsedPlan = byName(xRequiredTable, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot safely cast", "'x'", "double to float",
-      "Cannot find data for output column", "'y'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`x`",
+        "srcType" -> "\"DOUBLE\"",
+        "targetType" -> "\"FLOAT\"")
+    )
   }
 
   test("byPosition: fail canWrite check") {
@@ -209,9 +255,15 @@ abstract class V2StrictWriteAnalysisSuiteBase extends 
V2WriteAnalysisSuiteBase {
     val parsedPlan = byPosition(table, widerTable)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write", "'table-name'",
-      "Cannot safely cast", "'x'", "'y'", "double to float"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`x`",
+        "srcType" -> "\"DOUBLE\"",
+        "targetType" -> "\"FLOAT\"")
+    )
   }
 
   test("byPosition: multiple field errors are reported") {
@@ -226,10 +278,15 @@ abstract class V2StrictWriteAnalysisSuiteBase extends 
V2WriteAnalysisSuiteBase {
     val parsedPlan = byPosition(xRequiredTable, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot safely cast", "'x'", "double to float",
-      "Cannot safely cast", "'y'", "double to float"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`x`",
+        "srcType" -> "\"DOUBLE\"",
+        "targetType" -> "\"FLOAT\"")
+    )
   }
 }
 
@@ -335,9 +392,11 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot find data for output column", "'x'", "'y'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" 
-> "`x`")
+    )
   }
 
   test("byName: case sensitive column resolution") {
@@ -348,10 +407,11 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot find data for output column", "'x'"),
-      caseSensitive = true)
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" 
-> "`x`")
+    )
   }
 
   test("byName: case insensitive column resolution") {
@@ -410,9 +470,11 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = byName(requiredTable, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot find data for output column", "'x'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" 
-> "`x`")
+    )
   }
 
   test("byName: missing optional columns cause failure and are identified by 
name") {
@@ -423,9 +485,11 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = byName(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot find data for output column", "'x'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      expectedMessageParameters = Map("tableName" -> "`table-name`", "colName" 
-> "`x`")
+    )
   }
 
   test("byName: insert safe cast") {
@@ -468,9 +532,15 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val query = TestRelation(Seq($"b".struct($"y".int, $"x".int, $"z".int), 
$"a".int))
 
     val writePlan = byName(table, query)
-    assertAnalysisError(writePlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot write extra fields to struct 'b': 'z'"))
+    assertAnalysisErrorClass(
+      writePlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`b`",
+        "extraFields" -> "`z`"
+      )
+    )
   }
 
   test("byPosition: basic behavior") {
@@ -641,10 +711,11 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     withClue("byName") {
       val parsedPlan = byName(tableWithStructCol, query)
       assertNotResolved(parsedPlan)
-      assertAnalysisError(parsedPlan, Seq(
-        "Cannot write incompatible data to table", "'table-name'",
-        "Cannot find data for output column 'col.a'",
-        "Cannot find data for output column 'col.b'"))
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        expectedMessageParameters = Map("tableName" -> "`table-name`", 
"colName" -> "`col`.`a`")
+      )
     }
 
     withClue("byPosition") {
@@ -692,9 +763,14 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot write extra fields to struct 'b.n2': 'dn3'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`b`.`n2`",
+        "extraFields" -> "`dn3`")
+    )
   }
 
   test("SPARK-42997: extra fields in struct inside array (byName)") {
@@ -716,9 +792,14 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot write extra fields to struct 'arr.element': 'z'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`arr`.`element`",
+        "extraFields" -> "`z`")
+    )
   }
 
   test("SPARK-42997: extra fields in struct inside map key (byName)") {
@@ -744,9 +825,14 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot write extra fields to struct 'm.key': 'z'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`m`.`key`",
+        "extraFields" -> "`z`")
+    )
   }
 
   test("SPARK-42997: extra fields in struct inside map value (byName)") {
@@ -772,9 +858,14 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan = if (byNameResolution) byName(table, query) else 
byPosition(table, query)
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      "Cannot write extra fields to struct 'm.value': 'z'"))
+    assertAnalysisErrorClass(
+      parsedPlan,
+      expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      expectedMessageParameters = Map(
+        "tableName" -> "`table-name`",
+        "colName" -> "`m`.`value`",
+        "extraFields" -> "`z`")
+    )
   }
 
   test("SPARK-42997: missing fields in nested struct (byName)") {
@@ -800,9 +891,24 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     }
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      expectedErrMsg))
+    if (byNameResolution) {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`b`.`n2`.`dn3`")
+      )
+    } else {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = 
"INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`b`.`n2`",
+          "missingFields" -> "`dn3`")
+      )
+    }
   }
 
   test("SPARK-42997: missing fields in struct inside array (byName)") {
@@ -828,9 +934,24 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     }
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      expectedErrMsg))
+    if (byNameResolution) {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`arr`.`element`.`y`")
+      )
+    } else {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = 
"INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`arr`.`element`",
+          "missingFields" -> "`y`")
+      )
+    }
   }
 
   test("SPARK-42997: missing fields in struct inside map key (byName)") {
@@ -860,9 +981,24 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     }
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      expectedErrMsg))
+    if (byNameResolution) {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`m`.`key`.`y`")
+      )
+    } else {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = 
"INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`m`.`key`",
+          "missingFields" -> "`y`")
+      )
+    }
   }
 
   test("SPARK-42997: missing fields in struct inside map value (byName)") {
@@ -892,9 +1028,24 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     }
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq(
-      "Cannot write incompatible data to table", "'table-name'",
-      expectedErrMsg))
+    if (byNameResolution) {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`m`.`value`.`y`")
+      )
+    } else {
+      assertAnalysisErrorClass(
+        parsedPlan,
+        expectedErrorClass = 
"INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+        expectedMessageParameters = Map(
+          "tableName" -> "`table-name`",
+          "colName" -> "`m`.`value`",
+          "missingFields" -> "`y`")
+      )
+    }
   }
 
   test("SPARK-42855: NOT NULL checks for nested structs, arrays, maps 
(byName)") {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala
index 39aff1df791..7aaa69a0a5d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.types
 import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.expressions.Cast
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
+import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLType
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 
@@ -33,56 +35,94 @@ class StrictDataTypeWriteCompatibilitySuite extends 
DataTypeWriteCompatibilityBa
   override def canCast: (DataType, DataType) => Boolean = Cast.canUpCast
 
   test("Check struct types: unsafe casts are not allowed") {
-    assertNumErrors(widerPoint2, point2, "t",
-      "Should fail because types require unsafe casts", 2) { errs =>
-
-      assert(errs(0).contains("'t.x'"), "Should include the nested field name 
context")
-      assert(errs(0).contains("Cannot safely cast"))
-
-      assert(errs(1).contains("'t.y'"), "Should include the nested field name 
context")
-      assert(errs(1).contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", widerPoint2, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`t`.`x`",
+        "srcType" -> "\"DOUBLE\"",
+        "targetType" -> "\"FLOAT\"")
+    )
   }
 
   test("Check array types: unsafe casts are not allowed") {
     val arrayOfLong = ArrayType(LongType)
     val arrayOfInt = ArrayType(IntegerType)
 
-    assertSingleError(arrayOfLong, arrayOfInt, "arr",
-      "Should not allow array of longs to array of ints") { err =>
-      assert(err.contains("'arr.element'"),
-        "Should identify problem with named array's element type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", arrayOfLong, arrayOfInt, true,
+          analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, 
errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`arr`.`element`",
+        "srcType" -> "\"BIGINT\"",
+        "targetType" -> "\"INT\"")
+    )
   }
 
   test("Check map value types: casting Long to Integer is not allowed") {
     val mapOfLong = MapType(StringType, LongType)
     val mapOfInt = MapType(StringType, IntegerType)
 
-    assertSingleError(mapOfLong, mapOfInt, "m",
-      "Should not allow map of longs to map of ints") { err =>
-      assert(err.contains("'m.value'"), "Should identify problem with named 
map's value type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", mapOfLong, mapOfInt, true,
+          analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`m`.`value`",
+        "srcType" -> "\"BIGINT\"",
+        "targetType" -> "\"INT\"")
+    )
   }
 
   test("Check map key types: unsafe casts are not allowed") {
     val mapKeyLong = MapType(LongType, StringType)
     val mapKeyInt = MapType(IntegerType, StringType)
 
-    assertSingleError(mapKeyLong, mapKeyInt, "m",
-      "Should not allow map of long keys to map of int keys") { err =>
-      assert(err.contains("'m.key'"), "Should identify problem with named 
map's key type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", mapKeyLong, mapKeyInt, true,
+          analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`m`.`key`",
+        "srcType" -> "\"BIGINT\"",
+        "targetType" -> "\"INT\"")
+    )
   }
 
   test("Check NullType is incompatible with all other types") {
     allNonNullTypes.foreach { t =>
-      assertSingleError(NullType, t, "nulls", s"Should not allow writing None 
to type $t") { err =>
-        assert(err.contains(s"incompatible with ${t.catalogString}"))
-      }
+      val errs = new mutable.ArrayBuffer[String]()
+      checkError(
+        exception = intercept[AnalysisException] (
+          DataTypeUtils.canWrite("", NullType, t, true,
+            analysis.caseSensitiveResolution, "nulls", storeAssignmentPolicy,
+            errMsg => errs += errMsg)
+        ),
+        errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+        parameters = Map(
+          "tableName" -> "``",
+          "colName" -> "`nulls`",
+          "srcType" -> "\"VOID\"",
+          "targetType" -> toSQLType(t.catalogString))
+      )
     }
   }
 }
@@ -97,11 +137,19 @@ class ANSIDataTypeWriteCompatibilitySuite extends 
DataTypeWriteCompatibilityBase
     val mapOfString = MapType(StringType, StringType)
     val mapOfInt = MapType(StringType, IntegerType)
 
-    assertSingleError(mapOfString, mapOfInt, "m",
-      "Should not allow map of strings to map of ints") { err =>
-      assert(err.contains("'m.value'"), "Should identify problem with named 
map's value type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", mapOfString, mapOfInt, true,
+          analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`m`.`value`",
+        "srcType" -> "\"STRING\"",
+        "targetType" -> "\"INT\"")
+    )
   }
 
   private val stringPoint2 = StructType(Seq(
@@ -109,50 +157,87 @@ class ANSIDataTypeWriteCompatibilitySuite extends 
DataTypeWriteCompatibilityBase
     StructField("y", StringType, nullable = false)))
 
   test("Check struct types: unsafe casts are not allowed") {
-    assertNumErrors(stringPoint2, point2, "t",
-      "Should fail because types require unsafe casts", 2) { errs =>
-
-      assert(errs(0).contains("'t.x'"), "Should include the nested field name 
context")
-      assert(errs(0).contains("Cannot safely cast"))
-
-      assert(errs(1).contains("'t.y'"), "Should include the nested field name 
context")
-      assert(errs(1).contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", stringPoint2, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`t`.`x`",
+        "srcType" -> "\"STRING\"",
+        "targetType" -> "\"FLOAT\"")
+    )
   }
 
   test("Check array types: unsafe casts are not allowed") {
     val arrayOfString = ArrayType(StringType)
     val arrayOfInt = ArrayType(IntegerType)
 
-    assertSingleError(arrayOfString, arrayOfInt, "arr",
-      "Should not allow array of strings to array of ints") { err =>
-      assert(err.contains("'arr.element'"),
-        "Should identify problem with named array's element type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", arrayOfString, arrayOfInt, true,
+          analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, 
errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`arr`.`element`",
+        "srcType" -> "\"STRING\"",
+        "targetType" -> "\"INT\"")
+    )
   }
 
   test("Check map key types: unsafe casts are not allowed") {
     val mapKeyString = MapType(StringType, StringType)
     val mapKeyInt = MapType(IntegerType, StringType)
 
-    assertSingleError(mapKeyString, mapKeyInt, "m",
-      "Should not allow map of string keys to map of int keys") { err =>
-      assert(err.contains("'m.key'"), "Should identify problem with named 
map's key type")
-      assert(err.contains("Cannot safely cast"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", mapKeyString, mapKeyInt, true,
+          analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, 
errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`arr`.`key`",
+        "srcType" -> "\"STRING\"",
+        "targetType" -> "\"INT\"")
+    )
   }
 
   test("Conversions between timestamp and long are not allowed") {
-    assertSingleError(LongType, TimestampType, "longToTimestamp",
-      "Should not allow long to timestamp") { err =>
-      assert(err.contains("Cannot safely cast 'longToTimestamp': bigint to 
timestamp"))
-    }
-
-    assertSingleError(TimestampType, LongType, "timestampToLong",
-      "Should not allow timestamp to long") { err =>
-      assert(err.contains("Cannot safely cast 'timestampToLong': timestamp to 
bigint"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", LongType, TimestampType, true,
+          analysis.caseSensitiveResolution, "longToTimestamp", 
storeAssignmentPolicy,
+          errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`longToTimestamp`",
+        "srcType" -> "\"BIGINT\"",
+        "targetType" -> "\"TIMESTAMP\"")
+    )
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", TimestampType, LongType, true,
+          analysis.caseSensitiveResolution, "timestampToLong", 
storeAssignmentPolicy,
+          errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`timestampToLong`",
+        "srcType" -> "\"TIMESTAMP\"",
+        "targetType" -> "\"BIGINT\"")
+    )
   }
 
   test("SPARK-37707: Check datetime types compatible with each other") {
@@ -215,13 +300,21 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
           assertAllowed(w, r, "t", s"Should allow writing $w to $r because 
cast is safe")
 
         } else {
-          assertSingleError(w, r, "t",
-            s"Should not allow writing $w to $r because cast is not safe") { 
err =>
-            assert(err.contains("'t'"), "Should include the field name 
context")
-            assert(err.contains("Cannot safely cast"), "Should identify unsafe 
cast")
-            assert(err.contains(s"${w.catalogString}"), "Should include write 
type")
-            assert(err.contains(s"${r.catalogString}"), "Should include read 
type")
-          }
+          val errs = new mutable.ArrayBuffer[String]()
+          checkError(
+            exception = intercept[AnalysisException] (
+              DataTypeUtils.canWrite("", w, r, true, 
analysis.caseSensitiveResolution, "t",
+                storeAssignmentPolicy, errMsg => errs += errMsg)
+            ),
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+              "tableName" -> "``",
+              "colName" -> "`t`",
+              "srcType" -> toSQLType(w),
+              "targetType" -> toSQLType(r)
+            ),
+            matchPVals = true
+          )
         }
       }
     }
@@ -229,29 +322,33 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
 
   test("Check struct types: missing required field") {
     val missingRequiredField = StructType(Seq(StructField("x", FloatType, 
nullable = false)))
-    assertSingleError(missingRequiredField, point2, "t",
-      "Should fail because required field 'y' is missing") { err =>
-      assert(err.contains("'t'"), "Should include the struct name for context")
-      assert(err.contains("'y'"), "Should include the nested field name")
-      assert(err.contains("missing field"), "Should call out field missing")
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", missingRequiredField, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.STRUCT_MISSING_FIELDS",
+      parameters = Map("tableName" -> "``", "colName" -> "`t`", 
"missingFields" -> "`y`")
+    )
   }
 
   test("Check struct types: missing starting field, matched by position") {
     val missingRequiredField = StructType(Seq(StructField("y", FloatType, 
nullable = false)))
-
-    // should have 2 errors: names x and y don't match, and field y is missing
-    assertNumErrors(missingRequiredField, point2, "t",
-      "Should fail because field 'x' is matched to field 'y' and required 
field 'y' is missing", 2)
-    { errs =>
-      assert(errs(0).contains("'t'"), "Should include the struct name for 
context")
-      assert(errs(0).contains("expected 'x', found 'y'"), "Should detect name 
mismatch")
-      assert(errs(0).contains("field name does not match"), "Should identify 
name problem")
-
-      assert(errs(1).contains("'t'"), "Should include the struct name for 
context")
-      assert(errs(1).contains("'y'"), "Should include the _last_ nested fields 
of the read schema")
-      assert(errs(1).contains("missing field"), "Should call out field 
missing")
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", missingRequiredField, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.UNEXPECTED_COLUMN_NAME",
+      parameters = Map(
+        "expected" -> "`x`",
+        "found" -> "`y`",
+        "tableName" -> "``",
+        "colName" -> "`t`",
+        "order" -> "0")
+    )
   }
 
   test("Check struct types: missing middle field, matched by position") {
@@ -266,17 +363,20 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
 
     // types are compatible: (req int, req int) => (req int, req int, opt int)
     // but this should still fail because the names do not match.
-
-    assertNumErrors(missingMiddleField, expectedStruct, "t",
-      "Should fail because field 'y' is matched to field 'z'", 2) { errs =>
-      assert(errs(0).contains("'t'"), "Should include the struct name for 
context")
-      assert(errs(0).contains("expected 'y', found 'z'"), "Should detect name 
mismatch")
-      assert(errs(0).contains("field name does not match"), "Should identify 
name problem")
-
-      assert(errs(1).contains("'t'"), "Should include the struct name for 
context")
-      assert(errs(1).contains("'z'"), "Should include the nested field name")
-      assert(errs(1).contains("missing field"), "Should call out field 
missing")
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", missingMiddleField, expectedStruct, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.UNEXPECTED_COLUMN_NAME",
+      parameters = Map(
+        "expected" -> "`y`",
+        "found" -> "`z`",
+        "tableName" -> "``",
+        "colName" -> "`t`",
+        "order" -> "1")
+    )
   }
 
   test("Check struct types: generic colN names are ignored") {
@@ -300,20 +400,27 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
       StructField("x", FloatType),
       StructField("y", FloatType, nullable = false)))
 
-    assertSingleError(requiredFieldIsOptional, point2, "t",
-      "Should fail because required field 'x' is optional") { err =>
-      assert(err.contains("'t.x'"), "Should include the nested field name 
context")
-      assert(err.contains("Cannot write nullable values to non-null field"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", requiredFieldIsOptional, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_COLUMN",
+      parameters = Map("tableName" -> "``", "colName" -> "`t`.`x`")
+    )
   }
 
   test("Check struct types: data field would be dropped") {
-    assertSingleError(point3, point2, "t",
-      "Should fail because field 'z' would be dropped") { err =>
-      assert(err.contains("'t'"), "Should include the struct name for context")
-      assert(err.contains("'z'"), "Should include the extra field name")
-      assert(err.contains("Cannot write extra fields"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", point3, point2, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS",
+      parameters = Map("tableName" -> "``", "colName" -> "`t`", "extraFields" 
-> "`z`")
+    )
   }
 
   test("Check struct types: type promotion is allowed") {
@@ -346,11 +453,15 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
     val arrayOfRequired = ArrayType(LongType, containsNull = false)
     val arrayOfOptional = ArrayType(LongType)
 
-    assertSingleError(arrayOfOptional, arrayOfRequired, "arr",
-      "Should not allow array of optional elements to array of required 
elements") { err =>
-      assert(err.contains("'arr'"), "Should include type name context")
-      assert(err.contains("Cannot write nullable elements to array of 
non-nulls"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", arrayOfOptional, arrayOfRequired, true,
+          analysis.caseSensitiveResolution, "arr", storeAssignmentPolicy, 
errMsg => errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_ARRAY_ELEMENTS",
+      parameters = Map("tableName" -> "``", "colName" -> "`arr`")
+    )
   }
 
   test("Check array types: writing required to optional elements is allowed") {
@@ -372,11 +483,15 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
     val mapOfRequired = MapType(StringType, LongType, valueContainsNull = 
false)
     val mapOfOptional = MapType(StringType, LongType)
 
-    assertSingleError(mapOfOptional, mapOfRequired, "m",
-      "Should not allow map of optional values to map of required values") { 
err =>
-      assert(err.contains("'m'"), "Should include type name context")
-      assert(err.contains("Cannot write nullable values to map of non-nulls"))
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", mapOfOptional, mapOfRequired, true,
+          analysis.caseSensitiveResolution, "m", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.NULLABLE_MAP_VALUES",
+      parameters = Map("tableName" -> "``", "colName" -> "`m`")
+    )
   }
 
   test("Check map value types: writing required to optional values is 
allowed") {
@@ -420,58 +535,20 @@ abstract class DataTypeWriteCompatibilityBaseSuite 
extends SparkFunSuite {
       StructField("y", StringType)
     ))
 
-    assertNumErrors(writeType, readType, "top", "Should catch 14 errors", 14) 
{ errs =>
-      assert(errs(0).contains("'top.a.element'"), "Should identify bad type")
-      assert(errs(0).contains("Cannot safely cast"))
-      assert(errs(0).contains("string to double"))
-
-      assert(errs(1).contains("'top.a'"), "Should identify bad type")
-      assert(errs(1).contains("Cannot write nullable elements to array of 
non-nulls"))
-
-      assert(errs(2).contains("'top.arr_of_structs.element'"), "Should 
identify bad type")
-      assert(errs(2).contains("'z'"), "Should identify bad field")
-      assert(errs(2).contains("Cannot write extra fields to struct"))
-
-      assert(errs(3).contains("'top.arr_of_structs'"), "Should identify bad 
type")
-      assert(errs(3).contains("Cannot write nullable elements to array of 
non-nulls"))
-
-      assert(errs(4).contains("'top.bad_nested_type'"), "Should identify bad 
type")
-      assert(errs(4).contains("is incompatible with"))
-
-      assert(errs(5).contains("'top.m.key'"), "Should identify bad type")
-      assert(errs(5).contains("Cannot safely cast"))
-      assert(errs(5).contains("string to bigint"))
-
-      assert(errs(6).contains("'top.m.value'"), "Should identify bad type")
-      assert(errs(6).contains("Cannot safely cast"))
-      assert(errs(6).contains("boolean to float"))
-
-      assert(errs(7).contains("'top.m'"), "Should identify bad type")
-      assert(errs(7).contains("Cannot write nullable values to map of 
non-nulls"))
-
-      assert(errs(8).contains("'top.map_of_structs.value'"), "Should identify 
bad type")
-      assert(errs(8).contains("expected 'y', found 'z'"), "Should detect name 
mismatch")
-      assert(errs(8).contains("field name does not match"), "Should identify 
name problem")
-
-      assert(errs(9).contains("'top.map_of_structs.value'"), "Should identify 
bad type")
-      assert(errs(9).contains("'z'"), "Should identify missing field")
-      assert(errs(9).contains("missing fields"), "Should detect missing field")
-
-      assert(errs(10).contains("'top.map_of_structs'"), "Should identify bad 
type")
-      assert(errs(10).contains("Cannot write nullable values to map of 
non-nulls"))
-
-      assert(errs(11).contains("'top.x'"), "Should identify bad type")
-      assert(errs(11).contains("Cannot safely cast"))
-      assert(errs(11).contains("string to int"))
-
-      assert(errs(12).contains("'top'"), "Should identify bad type")
-      assert(errs(12).contains("expected 'x', found 'y'"), "Should detect name 
mismatch")
-      assert(errs(12).contains("field name does not match"), "Should identify 
name problem")
-
-      assert(errs(13).contains("'top'"), "Should identify bad type")
-      assert(errs(13).contains("'missing1'"), "Should identify missing field")
-      assert(errs(13).contains("missing fields"), "Should detect missing 
field")
-    }
+    val errs = new mutable.ArrayBuffer[String]()
+    checkError(
+      exception = intercept[AnalysisException] (
+        DataTypeUtils.canWrite("", writeType, readType, true,
+          analysis.caseSensitiveResolution, "t", storeAssignmentPolicy, errMsg 
=> errs += errMsg)
+      ),
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`t`.`a`.`element`",
+        "srcType" -> "\"STRING\"",
+        "targetType" -> "\"DOUBLE\""
+      )
+    )
   }
 
   // Helper functions
@@ -483,8 +560,8 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends 
SparkFunSuite {
       desc: String,
       byName: Boolean = true): Unit = {
     assert(
-      DataTypeUtils.canWrite(writeType, readType, byName, 
analysis.caseSensitiveResolution, name,
-        storeAssignmentPolicy,
+      DataTypeUtils.canWrite("", writeType, readType, byName, 
analysis.caseSensitiveResolution,
+        name, storeAssignmentPolicy,
         errMsg => fail(s"Should not produce errors but was called with: 
$errMsg")), desc)
   }
 
@@ -509,8 +586,8 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends 
SparkFunSuite {
       (checkErrors: Seq[String] => Unit): Unit = {
     val errs = new mutable.ArrayBuffer[String]()
     assert(
-      DataTypeUtils.canWrite(writeType, readType, byName, 
analysis.caseSensitiveResolution, name,
-        storeAssignmentPolicy, errMsg => errs += errMsg) === false, desc)
+      DataTypeUtils.canWrite("", writeType, readType, byName, 
analysis.caseSensitiveResolution,
+        name, storeAssignmentPolicy, errMsg => errs += errMsg) === false, desc)
     assert(errs.size === numErrs, s"Should produce $numErrs error messages")
     checkErrors(errs.toSeq)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index f8128c8c23e..f58f798b8de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -141,12 +141,13 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
 
     checkAnswer(spark.table("testcat.table_name"), Seq.empty)
 
-    val exc = intercept[AnalysisException] {
-      spark.table("source").withColumnRenamed("data", 
"d").writeTo("testcat.table_name").append()
-    }
-
-    assert(exc.getMessage.contains("Cannot find data for output column"))
-    assert(exc.getMessage.contains("'data'"))
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.table("source").withColumnRenamed("data", 
"d").writeTo("testcat.table_name").append()
+      },
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> 
"`data`")
+    )
 
     checkAnswer(
       spark.table("testcat.table_name"),
@@ -244,13 +245,14 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
 
     checkAnswer(spark.table("testcat.table_name"), Seq.empty)
 
-    val exc = intercept[AnalysisException] {
-      spark.table("source").withColumnRenamed("data", "d")
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.table("source").withColumnRenamed("data", "d")
           .writeTo("testcat.table_name").overwrite(lit(true))
-    }
-
-    assert(exc.getMessage.contains("Cannot find data for output column"))
-    assert(exc.getMessage.contains("'data'"))
+      },
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> 
"`data`")
+    )
 
     checkAnswer(
       spark.table("testcat.table_name"),
@@ -348,13 +350,14 @@ class DataFrameWriterV2Suite extends QueryTest with 
SharedSparkSession with Befo
 
     checkAnswer(spark.table("testcat.table_name"), Seq.empty)
 
-    val exc = intercept[AnalysisException] {
-      spark.table("source").withColumnRenamed("data", "d")
+    checkError(
+      exception = intercept[AnalysisException] {
+        spark.table("source").withColumnRenamed("data", "d")
           .writeTo("testcat.table_name").overwritePartitions()
-    }
-
-    assert(exc.getMessage.contains("Cannot find data for output column"))
-    assert(exc.getMessage.contains("'data'"))
+      },
+      errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      parameters = Map("tableName" -> "`testcat`.`table_name`", "colName" -> 
"`data`")
+    )
 
     checkAnswer(
       spark.table("testcat.table_name"),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 09ea35737f8..0bbed51d0a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -214,10 +214,9 @@ trait SQLInsertTestSuite extends QueryTest with 
SQLTestUtils {
           processInsert("t1", df, overwrite = false, byName = true)
         },
         v1ErrorClass = "_LEGACY_ERROR_TEMP_1186",
-        v2ErrorClass = "_LEGACY_ERROR_TEMP_1204",
+        v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
         v1Parameters = Map.empty[String, String],
-        v2Parameters = Map("tableName" -> "testcat.t1",
-          "errors" -> "Cannot find data for output column 'c1'")
+        v2Parameters = Map("tableName" -> "`testcat`.`t1`", "colName" -> 
"`c1`")
       )
       val df2 = Seq((3, 2, 1, 0)).toDF(Seq("c3", "c2", "c1", "c0"): _*)
       checkV1AndV2Error(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala
index d10dd8c478a..488b4d31bd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, 
AttributeReference, BooleanLiteral, Cast, CheckOverflowInTableInsert, 
CreateNamedStruct, EvalMode, GetStructField, IntegerLiteral, LambdaFunction, 
LongLiteral, MapFromArrays, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, 
StaticInvoke}
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, InsertAction, 
MergeAction, MergeIntoTable, UpdateAction}
@@ -578,13 +579,34 @@ class AlignMergeAssignmentsSuite extends 
AlignAssignmentsSuiteBase {
             "Multiple assignments for 'txt': 'a', 'b'")
 
           // two updates to a nested column
-          assertAnalysisException(
-            s"""MERGE INTO nested_struct_table t USING nested_struct_table src
-               |ON t.i = src.i
-               |$clause THEN
-               | UPDATE SET s.n_i = 1, s.n_s = null, s.n_i = -1
-               |""".stripMargin,
-            "Multiple assignments for 's.n_i': 1, -1")
+          val e = intercept[AnalysisException] {
+            parseAndResolve(
+              s"""MERGE INTO nested_struct_table t USING nested_struct_table 
src
+                 |ON t.i = src.i
+                 |$clause THEN
+                 | UPDATE SET s.n_i = 1, s.n_s = null, s.n_i = -1
+                 |""".stripMargin
+            )
+          }
+          if (policy == StoreAssignmentPolicy.ANSI) {
+            checkError(
+              exception = e,
+              errorClass = 
"DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS",
+              parameters = Map(
+                "sqlExpr" -> "\"s.n_i = 1\", \"s.n_s = NULL\", \"s.n_i = -1\"",
+                "errors" -> "\n- Multiple assignments for 's.n_i': 1, -1")
+            )
+          } else {
+            checkError(
+              exception = e,
+              errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+              parameters = Map(
+                "tableName" -> "``",
+                "colName" -> "`s`.`n_s`",
+                "srcType" -> "\"VOID\"",
+                "targetType" -> "\"STRUCT<DN_I:INT,DN_L:BIGINT>\"")
+            )
+          }
 
           // conflicting updates to a nested struct and its fields
           assertAnalysisException(
@@ -668,13 +690,20 @@ class AlignMergeAssignmentsSuite extends 
AlignAssignmentsSuiteBase {
              |""".stripMargin)
         assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i"))
 
-        assertAnalysisException(
-          s"""MERGE INTO nested_struct_table t USING nested_struct_table src
-             |ON t.i = src.i
-             |$clause THEN
-             | UPDATE SET s.n_s = named_struct('dn_i', 1)
-             |""".stripMargin,
-          "Cannot find data for output column 's.n_s.dn_l'")
+        val e = intercept[AnalysisException] {
+          parseAndResolve(
+            s"""MERGE INTO nested_struct_table t USING nested_struct_table src
+               |ON t.i = src.i
+               |$clause THEN
+               | UPDATE SET s.n_s = named_struct('dn_i', 1)
+               |""".stripMargin
+          )
+        }
+        checkError(
+          exception = e,
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          parameters = Map("tableName" -> "``", "colName" -> 
"`s`.`n_s`.`dn_l`")
+        )
 
         // ANSI mode does NOT allow string to int casts
         assertAnalysisException(
@@ -807,13 +836,20 @@ class AlignMergeAssignmentsSuite extends 
AlignAssignmentsSuiteBase {
              |""".stripMargin)
         assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i"))
 
-        assertAnalysisException(
-          s"""MERGE INTO nested_struct_table t USING nested_struct_table src
-             |ON t.i = src.i
-             |$clause THEN
-             | UPDATE SET s.n_s = named_struct('dn_i', 1)
-             |""".stripMargin,
-          "Cannot find data for output column 's.n_s.dn_l'")
+        val e = intercept[AnalysisException] {
+          parseAndResolve(
+            s"""MERGE INTO nested_struct_table t USING nested_struct_table src
+               |ON t.i = src.i
+               |$clause THEN
+               | UPDATE SET s.n_s = named_struct('dn_i', 1)
+               |""".stripMargin
+          )
+        }
+        checkError(
+          exception = e,
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          parameters = Map("tableName" -> "``", "colName" -> 
"`s`.`n_s`.`dn_l`")
+        )
 
         // strict mode does NOT allow string to int casts
         assertAnalysisException(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala
index d1369e7e571..599f3e994ef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignUpdateAssignmentsSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, 
AttributeReference, BooleanLiteral, Cast, CheckOverflowInTableInsert, 
CreateNamedStruct, EvalMode, GetStructField, IntegerLiteral, LambdaFunction, 
LongLiteral, MapFromArrays, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
@@ -469,9 +470,30 @@ class AlignUpdateAssignmentsSuite extends 
AlignAssignmentsSuiteBase {
           "Multiple assignments for 'i': 1, -1")
 
         // two updates to a nested column
-        assertAnalysisException(
-          "UPDATE nested_struct_table SET s.n_i = 1, s.n_s = null, s.n_i = -1",
-          "Multiple assignments for 's.n_i': 1, -1")
+        val e = intercept[AnalysisException] {
+          parseAndResolve(
+            "UPDATE nested_struct_table SET s.n_i = 1, s.n_s = null, s.n_i = 
-1"
+          )
+        }
+        if (policy == StoreAssignmentPolicy.ANSI) {
+          checkError(
+            exception = e,
+            errorClass = 
"DATATYPE_MISMATCH.INVALID_ROW_LEVEL_OPERATION_ASSIGNMENTS",
+            parameters = Map(
+              "sqlExpr" -> "\"s.n_i = 1\", \"s.n_s = NULL\", \"s.n_i = -1\"",
+              "errors" -> "\n- Multiple assignments for 's.n_i': 1, -1")
+          )
+        } else {
+          checkError(
+            exception = e,
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+              "tableName" -> "``",
+              "colName" -> "`s`.`n_s`",
+              "srcType" -> "\"VOID\"",
+              "targetType" -> "\"STRUCT<DN_I:INT,DN_L:BIGINT>\"")
+          )
+        }
 
         // conflicting updates to a nested struct and its fields
         assertAnalysisException(
@@ -509,9 +531,16 @@ class AlignUpdateAssignmentsSuite extends 
AlignAssignmentsSuiteBase {
         "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', NULL, 
'dn_l', 1L)")
       assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i"))
 
-      assertAnalysisException(
-        "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)",
-        "Cannot find data for output column 's.n_s.dn_l'")
+      val e = intercept[AnalysisException] {
+        parseAndResolve(
+          "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)"
+        )
+      }
+      checkError(
+        exception = e,
+        errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        parameters = Map("tableName" -> "``", "colName" -> "`s`.`n_s`.`dn_l`")
+      )
 
       // ANSI mode does NOT allow string to int casts
       assertAnalysisException(
@@ -555,9 +584,16 @@ class AlignUpdateAssignmentsSuite extends 
AlignAssignmentsSuiteBase {
           |SET s.n_s = named_struct('dn_i', CAST (NULL AS INT), 'dn_l', 
1L)""".stripMargin)
       assertNullCheckExists(plan4, Seq("s", "n_s", "dn_i"))
 
-      assertAnalysisException(
-        "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)",
-        "Cannot find data for output column 's.n_s.dn_l'")
+      val e = intercept[AnalysisException] {
+        parseAndResolve(
+          "UPDATE nested_struct_table SET s.n_s = named_struct('dn_i', 1)"
+        )
+      }
+      checkError(
+        exception = e,
+        errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+        parameters = Map("tableName" -> "``", "colName" -> "`s`.`n_s`.`dn_l`")
+      )
 
       // strict mode does NOT allow string to int casts
       assertAnalysisException(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 48bdd799017..c6bfd8c14dd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -657,18 +657,25 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t select 1L, 2")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
-          parameters = Map("tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot safely cast 'i': bigint to int"))
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`i`",
+            "srcType" -> "\"BIGINT\"",
+            "targetType" -> "\"INT\"")
+        )
 
         checkError(
           exception = intercept[AnalysisException] {
             sql("insert into t select 1, 2.0")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot safely cast 'd': decimal(2,1) to double"))
+            "colName" -> "`d`",
+            "srcType" -> "\"DECIMAL(2,1)\"",
+            "targetType" -> "\"DOUBLE\"")
+        )
 
         checkError(
           exception = intercept[AnalysisException] {
@@ -697,31 +704,34 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t values('a', 'b')")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'i': string to int\n- " +
-              "Cannot safely cast 'd': string to double"))
+            "colName" -> "`i`",
+            "srcType" -> "\"STRING\"",
+            "targetType" -> "\"INT\"")
         )
         checkError(
           exception = intercept[AnalysisException] {
             sql("insert into t values(now(), now())")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " +
-              "Cannot safely cast 'd': timestamp to double"))
+            "colName" -> "`i`",
+            "srcType" -> "\"TIMESTAMP\"",
+            "targetType" -> "\"INT\"")
         )
         checkError(
           exception = intercept[AnalysisException] {
             sql("insert into t values(true, false)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'i': boolean to int\n- " +
-              "Cannot safely cast 'd': boolean to double"))
+            "colName" -> "`i`",
+            "srcType" -> "\"BOOLEAN\"",
+            "targetType" -> "\"INT\"")
         )
       }
     }
@@ -839,11 +849,12 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 1)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'i': timestamp to int\n- " +
-              "Cannot safely cast 't': int to timestamp"))
+            "colName" -> "`i`",
+            "srcType" -> "\"TIMESTAMP\"",
+            "targetType" -> "\"INT\"")
         )
       }
 
@@ -853,11 +864,12 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("INSERT INTO t VALUES (date('2010-09-02'), 1)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'i': date to int\n- " +
-              "Cannot safely cast 'd': int to date"))
+            "colName" -> "`i`",
+            "srcType" -> "\"DATE\"",
+            "targetType" -> "\"INT\"")
         )
       }
 
@@ -867,11 +879,12 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("INSERT INTO t VALUES (TIMESTAMP('2010-09-02 14:10:10'), 
true)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'b': timestamp to boolean\n- " +
-              "Cannot safely cast 't': boolean to timestamp"))
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+              "tableName" -> "`spark_catalog`.`default`.`t`",
+              "colName" -> "`b`",
+              "srcType" -> "\"TIMESTAMP\"",
+              "targetType" -> "\"BOOLEAN\"")
         )
       }
 
@@ -881,11 +894,12 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("INSERT INTO t VALUES (date('2010-09-02'), true)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> ("Cannot safely cast 'b': date to boolean\n- " +
-              "Cannot safely cast 'd': boolean to date"))
+            errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+            parameters = Map(
+              "tableName" -> "`spark_catalog`.`default`.`t`",
+              "colName" -> "`b`",
+              "srcType" -> "\"DATE\"",
+              "targetType" -> "\"BOOLEAN\"")
         )
       }
     }
@@ -1374,10 +1388,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t (i) values (true)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot find data for output column 's'"))
+            "colName" -> "`s`"))
       }
       withTable("t") {
         sql("create table t(i boolean default true, s bigint) using parquet")
@@ -1385,10 +1399,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t (i) values (default)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot find data for output column 's'"))
+            "colName" -> "`s`"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint default 42) using parquet")
@@ -1396,10 +1410,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t (s) values (default)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot find data for output column 'i'"))
+            "colName" -> "`i`"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet 
partitioned by (i)")
@@ -1407,10 +1421,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t partition(i='true') (s) values(5)")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot find data for output column 'q'"))
+            "colName" -> "`q`"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet 
partitioned by (i)")
@@ -1418,10 +1432,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t partition(i='false') (q) select 43")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot find data for output column 's'"))
+            "colName" -> "`s`"))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet 
partitioned by (i)")
@@ -1429,10 +1443,10 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
           exception = intercept[AnalysisException] {
             sql("insert into t partition(i='false') (q) select default")
           },
-          errorClass = "_LEGACY_ERROR_TEMP_1204",
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
           parameters = Map(
             "tableName" -> "`spark_catalog`.`default`.`t`",
-            "errors" -> "Cannot find data for output column 's'"))
+            "colName" -> "`s`"))
       }
     }
     // When the CASE_SENSITIVE configuration is enabled, then using different 
cases for the required
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 44c9fbadfac..17348fe2dcb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -382,10 +382,17 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
       withTable("t") {
         sql("create table t(i int, d double) using parquet")
         // Calling `saveAsTable` to an existing table with append mode results 
in table insertion.
-        val msg = intercept[AnalysisException] {
-          Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': bigint to int"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`i`",
+            "srcType" -> "\"BIGINT\"",
+            "targetType" -> "\"INT\"")
+        )
 
         // Insert into table successfully.
         Seq((1, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t")
@@ -403,17 +410,29 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSparkSession with
       withTable("t") {
         sql("create table t(i int, d double) using parquet")
         // Calling `saveAsTable` to an existing table with append mode results 
in table insertion.
-        var msg = intercept[AnalysisException] {
-          Seq(("a", "b")).toDF("i", "d").write.mode("append").saveAsTable("t")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': string to int") &&
-          msg.contains("Cannot safely cast 'd': string to double"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            Seq(("a", "b")).toDF("i", 
"d").write.mode("append").saveAsTable("t")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`i`",
+            "srcType" -> "\"STRING\"",
+            "targetType" -> "\"INT\"")
+        )
 
-        msg = intercept[AnalysisException] {
-          Seq((true, false)).toDF("i", 
"d").write.mode("append").saveAsTable("t")
-        }.getMessage
-        assert(msg.contains("Cannot safely cast 'i': boolean to int") &&
-          msg.contains("Cannot safely cast 'd': boolean to double"))
+        checkError(
+          exception = intercept[AnalysisException] {
+            Seq((true, false)).toDF("i", 
"d").write.mode("append").saveAsTable("t")
+          },
+          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+          parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`i`",
+            "srcType" -> "\"BOOLEAN\"",
+            "targetType" -> "\"INT\"")
+        )
       }
     }
   }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index d6bf7773293..8476c87dc28 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -949,8 +949,15 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
         if (isPartitioned) {
           val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition 
(ds='a') SELECT 1.3"
           if (version == "0.12" || version == "0.13") {
-            val e = 
intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage
-            assert(e.contains(errorMsg))
+            checkError(
+              exception = 
intercept[AnalysisException](versionSpark.sql(insertStmt)),
+              errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+              parameters = Map(
+                "tableName" -> "`spark_catalog`.`default`.`tab1`",
+                "colName" -> "`f0`",
+                "srcType" -> "\"DECIMAL(2,1)\"",
+                "targetType" -> "\"BINARY\"")
+            )
           } else {
             versionSpark.sql(insertStmt)
             assert(versionSpark.table(tableName).collect() ===
@@ -959,8 +966,15 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
         } else {
           val insertStmt = s"INSERT OVERWRITE TABLE $tableName SELECT 1.3"
           if (version == "0.12" || version == "0.13") {
-            val e = 
intercept[AnalysisException](versionSpark.sql(insertStmt)).getMessage
-            assert(e.contains(errorMsg))
+            checkError(
+              exception = 
intercept[AnalysisException](versionSpark.sql(insertStmt)),
+              errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_SAFELY_CAST",
+              parameters = Map(
+                "tableName" -> "`spark_catalog`.`default`.`tab1`",
+                "colName" -> "`f0`",
+                "srcType" -> "\"DECIMAL(2,1)\"",
+                "targetType" -> "\"BINARY\"")
+            )
           } else {
             versionSpark.sql(insertStmt)
             assert(versionSpark.table(tableName).collect() ===


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to