This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d83abfc  [SPARK-36943][SQL] Improve readability of missing column 
error message
d83abfc is described below

commit d83abfc530d8dcc117e0123820b181c98d9f46f6
Author: Karen Feng <karen.f...@databricks.com>
AuthorDate: Mon Oct 11 12:48:03 2021 +0800

    [SPARK-36943][SQL] Improve readability of missing column error message
    
    ### What changes were proposed in this pull request?
    
    Improves the quality of the error message encountered by users when they 
attempt to access a column that does not exist.
    Removes the lingo term "resolve" and sorts the suggestions by probability.
    
    ### Why are the changes needed?
    
    Improves the user experience
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes:
    
    Before:
    ```
    cannot resolve 'foo' given input columns [bar, baz, froo]
    ```
    After:
    ```
    Column 'foo' does not exist. Did you mean one of the following? [bar, baz, 
froo]
    ```
    
    ### How was this patch tested?
    
    Unit tests
    
    Closes #34202 from karenfeng/improve-error-msg-missing-col.
    
    Authored-by: Karen Feng <karen.f...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/resources/error/error-classes.json   |  2 +-
 .../org/apache/spark/SparkThrowableSuite.scala     |  4 +-
 python/pyspark/pandas/tests/test_indexops_spark.py |  4 +-
 python/pyspark/sql/tests/test_utils.py             |  2 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 11 ++++--
 .../spark/sql/catalyst/util/StringUtils.scala      |  8 ++++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 35 ++++++++++++-----
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 16 +++++---
 .../spark/sql/catalyst/analysis/AnalysisTest.scala | 31 +++++++++++++++
 .../catalyst/analysis/ResolveSubquerySuite.scala   | 25 +++++++-----
 .../catalyst/analysis/V2WriteAnalysisSuite.scala   |  4 +-
 .../results/columnresolution-negative.sql.out      |  8 ++--
 .../resources/sql-tests/results/group-by.sql.out   |  2 +-
 .../sql-tests/results/join-lateral.sql.out         |  8 ++--
 .../sql-tests/results/natural-join.sql.out         |  2 +-
 .../test/resources/sql-tests/results/pivot.sql.out |  4 +-
 .../results/postgreSQL/aggregates_part1.sql.out    |  2 +-
 .../sql-tests/results/postgreSQL/join.sql.out      | 16 ++++----
 .../results/postgreSQL/select_having.sql.out       |  2 +-
 .../results/postgreSQL/select_implicit.sql.out     |  4 +-
 .../sql-tests/results/postgreSQL/union.sql.out     |  2 +-
 .../sql-tests/results/query_regex_column.sql.out   | 16 ++++----
 .../negative-cases/invalid-correlation.sql.out     |  2 +-
 .../sql-tests/results/table-aliases.sql.out        |  2 +-
 .../udf/postgreSQL/udf-aggregates_part1.sql.out    |  2 +-
 .../results/udf/postgreSQL/udf-join.sql.out        | 16 ++++----
 .../udf/postgreSQL/udf-select_having.sql.out       |  2 +-
 .../udf/postgreSQL/udf-select_implicit.sql.out     |  4 +-
 .../sql-tests/results/udf/udf-group-by.sql.out     |  2 +-
 .../sql-tests/results/udf/udf-pivot.sql.out        |  4 +-
 .../apache/spark/sql/DataFrameFunctionsSuite.scala | 24 ++++++++----
 .../org/apache/spark/sql/DataFrameSuite.scala      |  3 +-
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  |  3 +-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 21 +++++-----
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |  7 ++--
 .../scala/org/apache/spark/sql/SubquerySuite.scala |  3 +-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |  3 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 45 ++++++++++++++++------
 .../apache/spark/sql/execution/SQLViewSuite.scala  | 12 +++---
 .../sql/execution/datasources/csv/CSVSuite.scala   |  7 ++--
 .../sql/execution/datasources/json/JsonSuite.scala |  7 ++--
 .../org/apache/spark/sql/sources/InsertSuite.scala |  7 ++--
 .../apache/spark/sql/hive/HiveParquetSuite.scala   |  7 ++--
 43 files changed, 250 insertions(+), 141 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index d270f0e..301f8d0 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -90,7 +90,7 @@
     "message" : [ "Key %s does not exist." ]
   },
   "MISSING_COLUMN" : {
-    "message" : [ "cannot resolve '%s' given input columns: [%s]" ],
+    "message" : [ "Column '%s' does not exist. Did you mean one of the 
following? [%s]" ],
     "sqlState" : "42000"
   },
   "MISSING_STATIC_PARTITION_COLUMN" : {
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 5af55af..1cd3ba3 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -128,8 +128,8 @@ class SparkThrowableSuite extends SparkFunSuite {
   }
 
   test("Error message is formatted") {
-    assert(getMessage("MISSING_COLUMN", Array("foo", "bar")) ==
-      "cannot resolve 'foo' given input columns: [bar]")
+    assert(getMessage("MISSING_COLUMN", Array("foo", "bar, baz")) ==
+      "Column 'foo' does not exist. Did you mean one of the following? [bar, 
baz]")
   }
 
   test("Try catching legacy SparkError") {
diff --git a/python/pyspark/pandas/tests/test_indexops_spark.py 
b/python/pyspark/pandas/tests/test_indexops_spark.py
index 3cfc084..88bf395 100644
--- a/python/pyspark/pandas/tests/test_indexops_spark.py
+++ b/python/pyspark/pandas/tests/test_indexops_spark.py
@@ -39,7 +39,7 @@ class SparkIndexOpsMethodsTest(PandasOnSparkTestCase, 
SQLTestUtils):
         ):
             self.psser.spark.transform(lambda scol: 1)
 
-        with self.assertRaisesRegex(AnalysisException, "cannot 
resolve.*non-existent.*"):
+        with self.assertRaisesRegex(AnalysisException, 
"Column.*non-existent.*does not exist"):
             self.psser.spark.transform(lambda scol: F.col("non-existent"))
 
     def test_multiindex_transform_negative(self):
@@ -59,7 +59,7 @@ class SparkIndexOpsMethodsTest(PandasOnSparkTestCase, 
SQLTestUtils):
         ):
             self.psser.spark.apply(lambda scol: 1)
 
-        with self.assertRaisesRegex(AnalysisException, "cannot 
resolve.*non-existent.*"):
+        with self.assertRaisesRegex(AnalysisException, 
"Column.*non-existent.*does not exist"):
             self.psser.spark.transform(lambda scol: F.col("non-existent"))
 
 
diff --git a/python/pyspark/sql/tests/test_utils.py 
b/python/pyspark/sql/tests/test_utils.py
index 005f0e8..6d23736 100644
--- a/python/pyspark/sql/tests/test_utils.py
+++ b/python/pyspark/sql/tests/test_utils.py
@@ -31,7 +31,7 @@ class UtilsTests(ReusedSQLTestCase):
         try:
             self.spark.sql("select `中文字段`")
         except AnalysisException as e:
-            self.assertRegex(str(e), "cannot resolve '`中文字段`'")
+            self.assertRegex(str(e), "Column '`中文字段`' does not exist")
 
     def test_capture_parse_exception(self):
         self.assertRaises(ParseException, lambda: self.spark.sql("abc"))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c8614b1..bdd7ffb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
+import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, 
TypeUtils}
 import org.apache.spark.sql.connector.catalog.{LookupCatalog, 
SupportsPartitionManagement}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.internal.SQLConf
@@ -167,9 +167,12 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 
         operator transformExpressionsUp {
           case a: Attribute if !a.resolved =>
-            val from = 
operator.inputSet.toSeq.map(_.qualifiedName).mkString(", ")
-            // cannot resolve '${a.sql}' given input columns: [$from]
-            a.failAnalysis(errorClass = "MISSING_COLUMN", messageParameters = 
Array(a.sql, from))
+            val missingCol = a.sql
+            val candidates = operator.inputSet.toSeq.map(_.qualifiedName)
+            val orderedCandidates = 
StringUtils.orderStringsBySimilarity(missingCol, candidates)
+            a.failAnalysis(
+              errorClass = "MISSING_COLUMN",
+              messageParameters = Array(missingCol, 
orderedCandidates.mkString(", ")))
 
           case s: Star =>
             withPosition(s) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
index 1fb7de7..0d0f7a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala
@@ -21,6 +21,8 @@ import java.util.regex.{Pattern, PatternSyntaxException}
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.commons.lang3.{ StringUtils => ACLStringUtils }
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -71,6 +73,12 @@ object StringUtils extends Logging {
   private[this] val falseStrings =
     Set("f", "false", "n", "no", "0").map(UTF8String.fromString)
 
+  private[spark] def orderStringsBySimilarity(
+      baseString: String,
+      testStrings: Seq[String]): Seq[String] = {
+    testStrings.sortBy(ACLStringUtils.getLevenshteinDistance(_, baseString))
+  }
+
   // scalastyle:off caselocale
   def isTrueString(s: UTF8String): Boolean = 
trueStrings.contains(s.trimAll().toLowerCase)
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index eef61ee..3011351 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -111,6 +111,16 @@ class AnalysisErrorSuite extends AnalysisTest {
     }
   }
 
+  def errorClassTest(
+      name: String,
+      plan: LogicalPlan,
+      errorClass: String,
+      messageParameters: Array[String]): Unit = {
+    test(name) {
+      assertAnalysisErrorClass(plan, errorClass, messageParameters)
+    }
+  }
+
   val dateLit = Literal.create(null, DateType)
 
   errorTest(
@@ -283,17 +293,19 @@ class AnalysisErrorSuite extends AnalysisTest {
     listRelation.select(Explode($"list").as("a"), Explode($"list").as("b")),
     "only one generator" :: "explode" :: Nil)
 
-  errorTest(
+  errorClassTest(
     "unresolved attributes",
     testRelation.select($"abcd"),
-    "cannot resolve" :: "abcd" :: Nil)
+    "MISSING_COLUMN",
+    Array("abcd", "a"))
 
-  errorTest(
+  errorClassTest(
     "unresolved attributes with a generated name",
     testRelation2.groupBy($"a")(max($"b"))
       .where(sum($"b") > 0)
       .orderBy($"havingCondition".asc),
-    "cannot resolve" :: "havingCondition" :: Nil)
+    "MISSING_COLUMN",
+    Array("havingCondition", "max('b)"))
 
   errorTest(
     "unresolved star expansion in max",
@@ -305,10 +317,11 @@ class AnalysisErrorSuite extends AnalysisTest {
     mapRelation.orderBy($"map".asc),
     "sort" :: "type" :: "map<int,int>" :: Nil)
 
-  errorTest(
+  errorClassTest(
     "sorting by attributes are not from grouping expressions",
     testRelation2.groupBy($"a", $"c")($"a", $"c", 
count($"a").as("a3")).orderBy($"b".asc),
-    "cannot resolve" :: "'b'" :: "given input columns" :: "[a, a3, c]" :: Nil)
+    "MISSING_COLUMN",
+    Array("b", "a, c, a3"))
 
   errorTest(
     "non-boolean filters",
@@ -397,11 +410,12 @@ class AnalysisErrorSuite extends AnalysisTest {
     testRelation3.except(testRelation4, isAll = false),
     "except" :: "the compatible column types" :: "map" :: "decimal" :: Nil)
 
-  errorTest(
+  errorClassTest(
     "SPARK-9955: correct error message for aggregate",
     // When parse SQL string, we will wrap aggregate expressions with 
UnresolvedAlias.
     testRelation2.where($"bad_column" > 
1).groupBy($"a")(UnresolvedAlias(max($"b"))),
-    "cannot resolve 'bad_column'" :: Nil)
+    "MISSING_COLUMN",
+    Array("bad_column", "a, b, c, d, e"))
 
   errorTest(
     "slide duration greater than window in time window",
@@ -770,9 +784,10 @@ class AnalysisErrorSuite extends AnalysisTest {
   }
 
   errorTest(
-    "SC-69611: error code to error message",
+    "SPARK-34920: error code to error message",
     testRelation2.where($"bad_column" > 
1).groupBy($"a")(UnresolvedAlias(max($"b"))),
-    "cannot resolve 'bad_column' given input columns: [a, b, c, d, e]" :: Nil)
+    "Column 'bad_column' does not exist. Did you mean one of the following? 
[a, b, c, d, e]"
+      :: Nil)
 
   test("SPARK-35080: Unsupported correlated equality predicates in subquery") {
     val a = AttributeReference("a", IntegerType)()
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index cd72470..95548d2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -100,10 +100,11 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
       Project(testRelation.output, testRelation))
 
-    assertAnalysisError(
+    assertAnalysisErrorClass(
       Project(Seq(UnresolvedAttribute("tBl.a")),
         SubqueryAlias("TbL", UnresolvedRelation(TableIdentifier("TaBlE")))),
-      Seq("cannot resolve"))
+      "MISSING_COLUMN",
+      Array("tBl.a", "TbL.a"))
 
     checkAnalysisWithoutViewWrapper(
       Project(Seq(UnresolvedAttribute("TbL.a")),
@@ -707,8 +708,9 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   }
 
   test("CTE with non-existing column alias") {
-    assertAnalysisError(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM t 
WHERE y = 1"),
-      Seq("cannot resolve 'y' given input columns: [t.x]"))
+    assertAnalysisErrorClass(parsePlan("WITH t(x) AS (SELECT 1) SELECT * FROM 
t WHERE y = 1"),
+      "MISSING_COLUMN",
+      Array("y", "t.x"))
   }
 
   test("CTE with non-matching column alias") {
@@ -1138,12 +1140,14 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         |ORDER BY c.x
         |""".stripMargin))
 
-    assertAnalysisError(parsePlan(
+    assertAnalysisErrorClass(parsePlan(
      """
         |SELECT c.x
         |FROM VALUES NAMED_STRUCT('x', 'A', 'y', 1), NAMED_STRUCT('x', 'A', 
'y', 2) AS t(c)
         |GROUP BY c.x
         |ORDER BY c.x + c.y
-        |""".stripMargin), "cannot resolve 'c.y' given input columns: [x]" :: 
Nil)
+        |""".stripMargin),
+      "MISSING_COLUMN",
+      Array("c.y", "x"))
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 2465384..53dc9be 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -169,6 +169,37 @@ trait AnalysisTest extends PlanTest {
     }
   }
 
+  protected def assertAnalysisErrorClass(
+      inputPlan: LogicalPlan,
+      expectedErrorClass: String,
+      expectedMessageParameters: Array[String],
+      caseSensitive: Boolean = true): Unit = {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) {
+      val analyzer = getAnalyzer
+      val e = intercept[AnalysisException] {
+        analyzer.checkAnalysis(analyzer.execute(inputPlan))
+      }
+
+      if (e.getErrorClass != expectedErrorClass ||
+        !e.messageParameters.sameElements(expectedMessageParameters)) {
+        var failMsg = ""
+        if (e.getErrorClass != expectedErrorClass) {
+          failMsg +=
+            s"""Error class should be: ${expectedErrorClass}
+               |Actual error class: ${e.getErrorClass}
+             """.stripMargin
+        }
+        if (!e.messageParameters.sameElements(expectedMessageParameters)) {
+          failMsg +=
+            s"""Message parameters should be: 
${expectedMessageParameters.mkString("\n  ")}
+               |Actual message parameters: ${e.messageParameters.mkString("\n  
")}
+             """.stripMargin
+        }
+        fail(failMsg)
+      }
+    }
+  }
+
   protected def interceptParseException(
       parser: String => Any)(sqlCommand: String, messages: String*): Unit = {
     val e = intercept[ParseException](parser(sqlCommand))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
index 212f2b8..2c3ec0a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveSubquerySuite.scala
@@ -131,31 +131,36 @@ class ResolveSubquerySuite extends AnalysisTest {
 
     // SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT a, b, c))
     // TODO: support accessing columns from outer outer query.
-    assertAnalysisError(
+    assertAnalysisErrorClass(
       lateralJoin(t1, lateralJoin(t2, t0.select('a, 'b, 'c))),
-      Seq("cannot resolve 'a' given input columns: []"))
+      "MISSING_COLUMN",
+      Array("a", ""))
   }
 
   test("lateral subquery with unresolvable attributes") {
     // SELECT * FROM t1, LATERAL (SELECT a, c)
-    assertAnalysisError(
+    assertAnalysisErrorClass(
       lateralJoin(t1, t0.select('a, 'c)),
-      Seq("cannot resolve 'c' given input columns: []")
+      "MISSING_COLUMN",
+      Array("c", "")
     )
     // SELECT * FROM t1, LATERAL (SELECT a, b, c, d FROM t2)
-    assertAnalysisError(
+    assertAnalysisErrorClass(
       lateralJoin(t1, t2.select('a, 'b, 'c, 'd)),
-      Seq("cannot resolve 'd' given input columns: [b, c]")
+      "MISSING_COLUMN",
+      Array("d", "b, c")
     )
     // SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT t1.a))
-    assertAnalysisError(
+    assertAnalysisErrorClass(
       lateralJoin(t1, lateralJoin(t2, t0.select($"t1.a"))),
-      Seq("cannot resolve 't1.a' given input columns: []")
+      "MISSING_COLUMN",
+      Array("t1.a", "")
     )
     // SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL (SELECT a, b))
-    assertAnalysisError(
+    assertAnalysisErrorClass(
       lateralJoin(t1, lateralJoin(t2, t0.select('a, 'b))),
-      Seq("cannot resolve 'a' given input columns: []")
+      "MISSING_COLUMN",
+      Array("a", "")
     )
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
index 81043cd..1394fbd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala
@@ -685,7 +685,7 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
       LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
 
     assertNotResolved(parsedPlan)
-    assertAnalysisError(parsedPlan, Seq("cannot resolve", "a", "given input 
columns", "x, y"))
+    assertAnalysisErrorClass(parsedPlan, "MISSING_COLUMN", Array("a", "x, y"))
 
     val tableAcceptAnySchema = TestRelationAcceptAnySchema(StructType(Seq(
       StructField("x", DoubleType, nullable = false),
@@ -694,7 +694,7 @@ abstract class V2WriteAnalysisSuiteBase extends 
AnalysisTest {
     val parsedPlan2 = OverwriteByExpression.byPosition(tableAcceptAnySchema, 
query,
       LessThanOrEqual(UnresolvedAttribute(Seq("a")), Literal(15.0d)))
     assertNotResolved(parsedPlan2)
-    assertAnalysisError(parsedPlan2, Seq("cannot resolve", "a", "given input 
columns", "x, y"))
+    assertAnalysisErrorClass(parsedPlan2, "MISSING_COLUMN", Array("a", "x, y"))
   }
 
   test("SPARK-36498: reorder inner fields with byName mode") {
diff --git 
a/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
index 61cf397..6c1edfe 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/columnresolution-negative.sql.out
@@ -161,7 +161,7 @@ SELECT db1.t1.i1 FROM t1, mydb2.t1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'db1.t1.i1' given input columns: [spark_catalog.mydb2.t1.i1, 
spark_catalog.mydb2.t1.i1]; line 1 pos 7
+Column 'db1.t1.i1' does not exist. Did you mean one of the following? 
[spark_catalog.mydb2.t1.i1, spark_catalog.mydb2.t1.i1]; line 1 pos 7
 
 
 -- !query
@@ -186,7 +186,7 @@ SELECT mydb1.t1 FROM t1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'mydb1.t1' given input columns: [spark_catalog.mydb1.t1.i1]; 
line 1 pos 7
+Column 'mydb1.t1' does not exist. Did you mean one of the following? 
[spark_catalog.mydb1.t1.i1]; line 1 pos 7
 
 
 -- !query
@@ -204,7 +204,7 @@ SELECT t1 FROM mydb1.t1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't1' given input columns: [spark_catalog.mydb1.t1.i1]; line 1 
pos 7
+Column 't1' does not exist. Did you mean one of the following? 
[spark_catalog.mydb1.t1.i1]; line 1 pos 7
 
 
 -- !query
@@ -221,7 +221,7 @@ SELECT mydb1.t1.i1 FROM t1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'mydb1.t1.i1' given input columns: [spark_catalog.mydb2.t1.i1]; 
line 1 pos 7
+Column 'mydb1.t1.i1' does not exist. Did you mean one of the following? 
[spark_catalog.mydb2.t1.i1]; line 1 pos 7
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
index b5471a7..37deb87 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out
@@ -202,7 +202,7 @@ SELECT a AS k, COUNT(b) FROM testData GROUP BY k
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'k' given input columns: [testdata.a, testdata.b]; line 1 pos 47
+Column 'k' does not exist. Did you mean one of the following? [testdata.a, 
testdata.b]; line 1 pos 47
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index 0dd2c41..0b74b7e 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -269,7 +269,7 @@ SELECT * FROM t1 JOIN LATERAL (SELECT t1.c1 AS a, t2.c1 AS 
b) s JOIN t2 ON s.b =
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't2.c1' given input columns: []; line 1 pos 50
+Column 't2.c1' does not exist. Did you mean one of the following? []; line 1 
pos 50
 
 
 -- !query
@@ -333,7 +333,7 @@ SELECT * FROM t1, LATERAL (SELECT * FROM t2, LATERAL 
(SELECT t1.c1 + t2.c1))
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't1.c1' given input columns: []; line 1 pos 61
+Column 't1.c1' does not exist. Did you mean one of the following? []; line 1 
pos 61
 
 
 -- !query
@@ -342,7 +342,7 @@ SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT c1), 
LATERAL (SELECT c2))
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'c2' given input columns: []; line 1 pos 70
+Column 'c2' does not exist. Did you mean one of the following? []; line 1 pos 
70
 
 
 -- !query
@@ -369,7 +369,7 @@ SELECT * FROM t1, LATERAL (SELECT c1, (SELECT SUM(c2) FROM 
t2 WHERE c1 = t1.c1))
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't1.c1' given input columns: [spark_catalog.default.t2.c1, 
spark_catalog.default.t2.c2]; line 1 pos 73
+Column 't1.c1' does not exist. Did you mean one of the following? 
[spark_catalog.default.t2.c1, spark_catalog.default.t2.c2]; line 1 pos 73
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
index 794e472..3686776 100644
--- a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
@@ -232,7 +232,7 @@ SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'nt2.k' given input columns: [__auto_generated_subquery_name.k, 
__auto_generated_subquery_name.v1, __auto_generated_subquery_name.v2]; line 1 
pos 7
+Column 'nt2.k' does not exist. Did you mean one of the following? 
[__auto_generated_subquery_name.k, __auto_generated_subquery_name.v1, 
__auto_generated_subquery_name.v2]; line 1 pos 7
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/pivot.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pivot.sql.out
index 69679f8..7c30179 100644
--- a/sql/core/src/test/resources/sql-tests/results/pivot.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pivot.sql.out
@@ -232,7 +232,7 @@ PIVOT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'year' given input columns: 
[__auto_generated_subquery_name.course, 
__auto_generated_subquery_name.earnings]; line 4 pos 0
+Column 'year' does not exist. Did you mean one of the following? 
[__auto_generated_subquery_name.course, 
__auto_generated_subquery_name.earnings]; line 4 pos 0
 
 
 -- !query
@@ -326,7 +326,7 @@ PIVOT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 's' given input columns: [coursesales.course, 
coursesales.earnings, coursesales.year]; line 4 pos 15
+Column 's' does not exist. Did you mean one of the following? 
[coursesales.year, coursesales.course, coursesales.earnings]; line 4 pos 15
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
index 23edb79..62b6acd 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/aggregates_part1.sql.out
@@ -390,4 +390,4 @@ from tenk1 o
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'o.unique1' given input columns: [i.even, i.fivethous, i.four, 
i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, 
i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 63
+Column 'o.unique1' does not exist. Did you mean one of the following? 
[i.unique1, i.unique2, i.hundred, i.even, i.four, i.stringu1, i.ten, i.odd, 
i.string4, i.stringu2, i.tenthous, i.twenty, i.two, i.thousand, i.fivethous, 
i.twothousand]; line 2 pos 63
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
index 779ec87..6d27785 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
@@ -3248,7 +3248,7 @@ select * from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'y.f1' given input columns: [j.f1, j.f1, x.q1, x.q2]; line 2 
pos 63
+Column 'y.f1' does not exist. Did you mean one of the following? [j.f1, j.f1, 
x.q1, x.q2]; line 2 pos 63
 
 
 -- !query
@@ -3267,7 +3267,7 @@ select t1.uunique1 from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't1.uunique1' given input columns: [t1.even, t2.even, 
t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, 
t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, 
t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, 
t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, 
t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
+Column 't1.uunique1' does not exist. Did you mean one of the following? 
[t1.unique1, t2.unique1, t1.unique2, t2.unique2, t1.hundred, t2.hundred, 
t1.stringu1, t1.even, t1.four, t1.string4, t2.stringu1, t1.stringu2, t1.ten, 
t1.tenthous, t2.even, t2.four, t1.odd, t2.string4, t2.stringu2, t2.ten, 
t2.tenthous, t1.thousand, t1.twenty, t1.two, t1.fivethous, t2.odd, t2.thousand, 
t2.twenty, t2.two, t2.fivethous, t1.twothousand, t2.twothousand]; line 1 pos 7
 
 
 -- !query
@@ -3277,7 +3277,7 @@ select t2.uunique1 from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't2.uunique1' given input columns: [t1.even, t2.even, 
t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, 
t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, 
t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, 
t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, 
t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
+Column 't2.uunique1' does not exist. Did you mean one of the following? 
[t2.unique1, t1.unique1, t2.unique2, t1.unique2, t2.hundred, t1.hundred, 
t2.stringu1, t2.even, t2.four, t2.string4, t1.stringu1, t2.stringu2, t2.ten, 
t2.tenthous, t1.even, t1.four, t2.odd, t1.string4, t1.stringu2, t1.ten, 
t1.tenthous, t2.thousand, t2.twenty, t2.two, t2.fivethous, t1.odd, t1.thousand, 
t1.twenty, t1.two, t1.fivethous, t2.twothousand, t1.twothousand]; line 1 pos 7
 
 
 -- !query
@@ -3287,7 +3287,7 @@ select uunique1 from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'uunique1' given input columns: [t1.even, t2.even, 
t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, 
t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, 
t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, 
t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, 
t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 7
+Column 'uunique1' does not exist. Did you mean one of the following? 
[t1.unique1, t2.unique1, t1.unique2, t2.unique2, t1.even, t2.even, t1.four, 
t2.four, t1.ten, t2.ten, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.two, 
t2.two, t1.stringu1, t2.stringu1, t1.twenty, t2.twenty, t1.string4, t2.string4, 
t1.stringu2, t2.stringu2, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, 
t1.fivethous, t2.fivethous, t1.twothousand, t2.twothousand]; line 1 pos 7
 
 
 -- !query
@@ -3487,7 +3487,7 @@ select f1,g from int4_tbl a, (select f1 as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'f1' given input columns: []; line 1 pos 37
+Column 'f1' does not exist. Did you mean one of the following? []; line 1 pos 
37
 
 
 -- !query
@@ -3496,7 +3496,7 @@ select f1,g from int4_tbl a, (select a.f1 as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a.f1' given input columns: []; line 1 pos 37
+Column 'a.f1' does not exist. Did you mean one of the following? []; line 1 
pos 37
 
 
 -- !query
@@ -3505,7 +3505,7 @@ select f1,g from int4_tbl a cross join (select f1 as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'f1' given input columns: []; line 1 pos 47
+Column 'f1' does not exist. Did you mean one of the following? []; line 1 pos 
47
 
 
 -- !query
@@ -3514,7 +3514,7 @@ select f1,g from int4_tbl a cross join (select a.f1 as g) 
ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a.f1' given input columns: []; line 1 pos 47
+Column 'a.f1' does not exist. Did you mean one of the following? []; line 1 
pos 47
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
index 3a62116..1d096b5 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_having.sql.out
@@ -152,7 +152,7 @@ SELECT 1 AS one FROM test_having HAVING a > 1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a' given input columns: [one]; line 1 pos 40
+Column 'a' does not exist. Did you mean one of the following? [one]; line 1 
pos 40
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
index 4a304ae..6f2d454 100755
--- 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/select_implicit.sql.out
@@ -122,7 +122,7 @@ SELECT count(*) FROM test_missing_target GROUP BY a ORDER 
BY b
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'b' given input columns: [count(1)]; line 1 pos 61
+Column 'b' does not exist. Did you mean one of the following? [count(1)]; line 
1 pos 61
 
 
 -- !query
@@ -327,7 +327,7 @@ SELECT count(a) FROM test_missing_target GROUP BY a ORDER 
BY b
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'b' given input columns: [count(a)]; line 1 pos 61
+Column 'b' does not exist. Did you mean one of the following? [count(a)]; line 
1 pos 61
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out
index 762d85a..13f3fe0 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/union.sql.out
@@ -526,7 +526,7 @@ SELECT q1 FROM int8_tbl EXCEPT SELECT q2 FROM int8_tbl 
ORDER BY q2 LIMIT 1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'q2' given input columns: [int8_tbl.q1]; line 1 pos 64
+Column 'q2' does not exist. Did you mean one of the following? [int8_tbl.q1]; 
line 1 pos 64
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out 
b/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out
index 2e93ee2..dbe60d8 100644
--- a/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out
@@ -36,7 +36,7 @@ SELECT `(a)?+.+` FROM testData2 WHERE a = 1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a)?+.+`' given input columns: [testdata2.A, testdata2.B, 
testdata2.c, testdata2.d]; line 1 pos 7
+Column '`(a)?+.+`' does not exist. Did you mean one of the following? 
[testdata2.A, testdata2.B, testdata2.c, testdata2.d]; line 1 pos 7
 
 
 -- !query
@@ -45,7 +45,7 @@ SELECT t.`(a)?+.+` FROM testData2 t WHERE a = 1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't.`(a)?+.+`' given input columns: [t.A, t.B, t.c, t.d]; line 1 
pos 7
+Column 't.`(a)?+.+`' does not exist. Did you mean one of the following? [t.A, 
t.B, t.c, t.d]; line 1 pos 7
 
 
 -- !query
@@ -54,7 +54,7 @@ SELECT `(a|b)` FROM testData2 WHERE a = 2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a|b)`' given input columns: [testdata2.A, testdata2.B, 
testdata2.c, testdata2.d]; line 1 pos 7
+Column '`(a|b)`' does not exist. Did you mean one of the following? 
[testdata2.A, testdata2.B, testdata2.c, testdata2.d]; line 1 pos 7
 
 
 -- !query
@@ -63,7 +63,7 @@ SELECT `(a|b)?+.+` FROM testData2 WHERE a = 2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a|b)?+.+`' given input columns: [testdata2.A, testdata2.B, 
testdata2.c, testdata2.d]; line 1 pos 7
+Column '`(a|b)?+.+`' does not exist. Did you mean one of the following? 
[testdata2.A, testdata2.B, testdata2.c, testdata2.d]; line 1 pos 7
 
 
 -- !query
@@ -72,7 +72,7 @@ SELECT SUM(`(a|b)?+.+`) FROM testData2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a|b)?+.+`' given input columns: [testdata2.A, testdata2.B, 
testdata2.c, testdata2.d]; line 1 pos 11
+Column '`(a|b)?+.+`' does not exist. Did you mean one of the following? 
[testdata2.A, testdata2.B, testdata2.c, testdata2.d]; line 1 pos 11
 
 
 -- !query
@@ -81,7 +81,7 @@ SELECT SUM(`(a)`) FROM testData2
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a)`' given input columns: [testdata2.A, testdata2.B, 
testdata2.c, testdata2.d]; line 1 pos 11
+Column '`(a)`' does not exist. Did you mean one of the following? 
[testdata2.A, testdata2.B, testdata2.c, testdata2.d]; line 1 pos 11
 
 
 -- !query
@@ -301,7 +301,7 @@ SELECT SUM(a) FROM testdata3 GROUP BY `(a)`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a)`' given input columns: [testdata3.a, testdata3.b]; line 1 
pos 38
+Column '`(a)`' does not exist. Did you mean one of the following? 
[testdata3.a, testdata3.b]; line 1 pos 38
 
 
 -- !query
@@ -310,4 +310,4 @@ SELECT SUM(a) FROM testdata3 GROUP BY `(a)?+.+`
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve '`(a)?+.+`' given input columns: [testdata3.a, testdata3.b]; 
line 1 pos 38
+Column '`(a)?+.+`' does not exist. Did you mean one of the following? 
[testdata3.a, testdata3.b]; line 1 pos 38
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
index 8734511..8a61d17 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out
@@ -137,4 +137,4 @@ ON     EXISTS (SELECT 1 FROM t2 WHERE t2a > t1a)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't1a' given input columns: [t2.t2a, t2.t2b, t2.t2c]; line 4 pos 
44
+Column 't1a' does not exist. Did you mean one of the following? [t2.t2a, 
t2.t2b, t2.t2c]; line 4 pos 44
diff --git 
a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out 
b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
index c80f421..9c95b31 100644
--- a/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/table-aliases.sql.out
@@ -60,7 +60,7 @@ SELECT a AS col1, b AS col2 FROM testData AS t(c, d)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a' given input columns: [t.c, t.d]; line 1 pos 7
+Column 'a' does not exist. Did you mean one of the following? [t.c, t.d]; line 
1 pos 7
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
index a60331b..446a648 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-aggregates_part1.sql.out
@@ -381,4 +381,4 @@ from tenk1 o
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'o.unique1' given input columns: [i.even, i.fivethous, i.four, 
i.hundred, i.odd, i.string4, i.stringu1, i.stringu2, i.ten, i.tenthous, 
i.thousand, i.twenty, i.two, i.twothousand, i.unique1, i.unique2]; line 2 pos 67
+Column 'o.unique1' does not exist. Did you mean one of the following? 
[i.unique1, i.unique2, i.hundred, i.even, i.four, i.stringu1, i.ten, i.odd, 
i.string4, i.stringu2, i.tenthous, i.twenty, i.two, i.thousand, i.fivethous, 
i.twothousand]; line 2 pos 67
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out
index 29a6e84..6d988bc 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-join.sql.out
@@ -3276,7 +3276,7 @@ select * from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'y.f1' given input columns: [j.f1, j.f1, x.q1, x.q2]; line 2 
pos 72
+Column 'y.f1' does not exist. Did you mean one of the following? [j.f1, j.f1, 
x.q1, x.q2]; line 2 pos 72
 
 
 -- !query
@@ -3295,7 +3295,7 @@ select udf(t1.uunique1) from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't1.uunique1' given input columns: [t1.even, t2.even, 
t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, 
t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, 
t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, 
t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, 
t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 11
+Column 't1.uunique1' does not exist. Did you mean one of the following? 
[t1.unique1, t2.unique1, t1.unique2, t2.unique2, t1.hundred, t2.hundred, 
t1.stringu1, t1.even, t1.four, t1.string4, t2.stringu1, t1.stringu2, t1.ten, 
t1.tenthous, t2.even, t2.four, t1.odd, t2.string4, t2.stringu2, t2.ten, 
t2.tenthous, t1.thousand, t1.twenty, t1.two, t1.fivethous, t2.odd, t2.thousand, 
t2.twenty, t2.two, t2.fivethous, t1.twothousand, t2.twothousand]; line 1 pos 11
 
 
 -- !query
@@ -3305,7 +3305,7 @@ select udf(udf(t2.uunique1)) from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 't2.uunique1' given input columns: [t1.even, t2.even, 
t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, 
t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, 
t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, 
t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, 
t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 15
+Column 't2.uunique1' does not exist. Did you mean one of the following? 
[t2.unique1, t1.unique1, t2.unique2, t1.unique2, t2.hundred, t1.hundred, 
t2.stringu1, t2.even, t2.four, t2.string4, t1.stringu1, t2.stringu2, t2.ten, 
t2.tenthous, t1.even, t1.four, t2.odd, t1.string4, t1.stringu2, t1.ten, 
t1.tenthous, t2.thousand, t2.twenty, t2.two, t2.fivethous, t1.odd, t1.thousand, 
t1.twenty, t1.two, t1.fivethous, t2.twothousand, t1.twothousand]; line 1 pos 15
 
 
 -- !query
@@ -3315,7 +3315,7 @@ select udf(uunique1) from
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'uunique1' given input columns: [t1.even, t2.even, 
t1.fivethous, t2.fivethous, t1.four, t2.four, t1.hundred, t2.hundred, t1.odd, 
t2.odd, t1.string4, t2.string4, t1.stringu1, t2.stringu1, t1.stringu2, 
t2.stringu2, t1.ten, t2.ten, t1.tenthous, t2.tenthous, t1.thousand, 
t2.thousand, t1.twenty, t2.twenty, t1.two, t2.two, t1.twothousand, 
t2.twothousand, t1.unique1, t2.unique1, t1.unique2, t2.unique2]; line 1 pos 11
+Column 'uunique1' does not exist. Did you mean one of the following? 
[t1.unique1, t2.unique1, t1.unique2, t2.unique2, t1.even, t2.even, t1.four, 
t2.four, t1.ten, t2.ten, t1.hundred, t2.hundred, t1.odd, t2.odd, t1.two, 
t2.two, t1.stringu1, t2.stringu1, t1.twenty, t2.twenty, t1.string4, t2.string4, 
t1.stringu2, t2.stringu2, t1.tenthous, t2.tenthous, t1.thousand, t2.thousand, 
t1.fivethous, t2.fivethous, t1.twothousand, t2.twothousand]; line 1 pos 11
 
 
 -- !query
@@ -3515,7 +3515,7 @@ select udf(udf(f1,g)) from int4_tbl a, (select 
udf(udf(f1)) as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'f1' given input columns: []; line 1 pos 55
+Column 'f1' does not exist. Did you mean one of the following? []; line 1 pos 
55
 
 
 -- !query
@@ -3524,7 +3524,7 @@ select udf(f1,g) from int4_tbl a, (select a.f1 as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a.f1' given input columns: []; line 1 pos 42
+Column 'a.f1' does not exist. Did you mean one of the following? []; line 1 
pos 42
 
 
 -- !query
@@ -3533,7 +3533,7 @@ select udf(udf(f1,g)) from int4_tbl a cross join (select 
udf(f1) as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'f1' given input columns: []; line 1 pos 61
+Column 'f1' does not exist. Did you mean one of the following? []; line 1 pos 
61
 
 
 -- !query
@@ -3542,7 +3542,7 @@ select udf(f1,g) from int4_tbl a cross join (select 
udf(udf(a.f1)) as g) ss
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a.f1' given input columns: []; line 1 pos 60
+Column 'a.f1' does not exist. Did you mean one of the following? []; line 1 
pos 60
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
index 45d2e16f..3d6f349 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_having.sql.out
@@ -152,7 +152,7 @@ SELECT 1 AS one FROM test_having HAVING udf(a) > 1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'a' given input columns: [one]; line 1 pos 44
+Column 'a' does not exist. Did you mean one of the following? [one]; line 1 
pos 44
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
index ee1f673..c499782 100755
--- 
a/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/udf/postgreSQL/udf-select_implicit.sql.out
@@ -125,7 +125,7 @@ SELECT udf(count(*)) FROM test_missing_target GROUP BY 
udf(a) ORDER BY udf(b)
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'b' given input columns: [udf(count(1))]; line 1 pos 75
+Column 'b' does not exist. Did you mean one of the following? [udf(count(1))]; 
line 1 pos 75
 
 
 -- !query
@@ -330,7 +330,7 @@ SELECT udf(count(udf(a))) FROM test_missing_target GROUP BY 
udf(a) ORDER BY udf(
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'b' given input columns: [udf(count(udf(a)))]; line 1 pos 80
+Column 'b' does not exist. Did you mean one of the following? 
[udf(count(udf(a)))]; line 1 pos 80
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
index 07489dc..5db0f4d 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-by.sql.out
@@ -202,7 +202,7 @@ SELECT a AS k, udf(COUNT(udf(b))) FROM testData GROUP BY k
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'k' given input columns: [testdata.a, testdata.b]; line 1 pos 57
+Column 'k' does not exist. Did you mean one of the following? [testdata.a, 
testdata.b]; line 1 pos 57
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out
index dc5cc29..2f47925 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-pivot.sql.out
@@ -232,7 +232,7 @@ PIVOT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 'year' given input columns: 
[__auto_generated_subquery_name.course, 
__auto_generated_subquery_name.earnings]; line 4 pos 0
+Column 'year' does not exist. Did you mean one of the following? 
[__auto_generated_subquery_name.course, 
__auto_generated_subquery_name.earnings]; line 4 pos 0
 
 
 -- !query
@@ -326,7 +326,7 @@ PIVOT (
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-cannot resolve 's' given input columns: [coursesales.course, 
coursesales.earnings, coursesales.year]; line 4 pos 15
+Column 's' does not exist. Did you mean one of the following? 
[coursesales.year, coursesales.course, coursesales.earnings]; line 4 pos 15
 
 
 -- !query
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index 38b9a75..a7d394b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -2328,7 +2328,8 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex3 = intercept[AnalysisException] {
       df.selectExpr("transform(a, x -> x)")
     }
-    assert(ex3.getMessage.contains("cannot resolve 'a'"))
+    assert(ex3.getErrorClass == "MISSING_COLUMN")
+    assert(ex3.messageParameters.head == "a")
   }
 
   test("map_filter") {
@@ -2399,7 +2400,8 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex4 = intercept[AnalysisException] {
       df.selectExpr("map_filter(a, (k, v) -> k > v)")
     }
-    assert(ex4.getMessage.contains("cannot resolve 'a'"))
+    assert(ex4.getErrorClass == "MISSING_COLUMN")
+    assert(ex4.messageParameters.head == "a")
   }
 
   test("filter function - array for primitive type not containing null") {
@@ -2558,7 +2560,8 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex4 = intercept[AnalysisException] {
       df.selectExpr("filter(a, x -> x)")
     }
-    assert(ex4.getMessage.contains("cannot resolve 'a'"))
+    assert(ex4.getErrorClass == "MISSING_COLUMN")
+    assert(ex4.messageParameters.head == "a")
   }
 
   test("exists function - array for primitive type not containing null") {
@@ -2690,7 +2693,8 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex4 = intercept[AnalysisException] {
       df.selectExpr("exists(a, x -> x)")
     }
-    assert(ex4.getMessage.contains("cannot resolve 'a'"))
+    assert(ex4.getErrorClass == "MISSING_COLUMN")
+    assert(ex4.messageParameters.head == "a")
   }
 
   test("forall function - array for primitive type not containing null") {
@@ -2836,12 +2840,14 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex4 = intercept[AnalysisException] {
       df.selectExpr("forall(a, x -> x)")
     }
-    assert(ex4.getMessage.contains("cannot resolve 'a'"))
+    assert(ex4.getErrorClass == "MISSING_COLUMN")
+    assert(ex4.messageParameters.head == "a")
 
     val ex4a = intercept[AnalysisException] {
       df.select(forall(col("a"), x => x))
     }
-    assert(ex4a.getMessage.contains("cannot resolve 'a'"))
+    assert(ex4a.getErrorClass == "MISSING_COLUMN")
+    assert(ex4a.messageParameters.head == "a")
   }
 
   test("aggregate function - array for primitive type not containing null") {
@@ -3018,7 +3024,8 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex5 = intercept[AnalysisException] {
       df.selectExpr("aggregate(a, 0, (acc, x) -> x)")
     }
-    assert(ex5.getMessage.contains("cannot resolve 'a'"))
+    assert(ex5.getErrorClass == "MISSING_COLUMN")
+    assert(ex5.messageParameters.head == "a")
   }
 
   test("map_zip_with function - map of primitive types") {
@@ -3571,7 +3578,8 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val ex4 = intercept[AnalysisException] {
       df.selectExpr("zip_with(a1, a, (acc, x) -> x)")
     }
-    assert(ex4.getMessage.contains("cannot resolve 'a'"))
+    assert(ex4.getErrorClass == "MISSING_COLUMN")
+    assert(ex4.messageParameters.head == "a")
   }
 
   private def assertValuesDoNotChangeAfterCoalesceOrUnion(v: Column): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 54fb90a..374c867 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2623,7 +2623,8 @@ class DataFrameSuite extends QueryTest
     val err = intercept[AnalysisException] {
       df.groupBy($"d", $"b").as[GroupByKey, Row]
     }
-    assert(err.getMessage.contains("cannot resolve 'd'"))
+    assert(err.getErrorClass == "MISSING_COLUMN")
+    assert(err.messageParameters.head == "d")
   }
 
   test("emptyDataFrame should be foldable") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 666bf73..1491c5a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -399,7 +399,8 @@ class DataFrameWindowFunctionsSuite extends QueryTest
     val df = Seq((1, "1")).toDF("key", "value")
     val e = intercept[AnalysisException](
       df.select($"key", count("invalid").over()))
-    assert(e.message.contains("cannot resolve 'invalid' given input columns: 
[key, value]"))
+    assert(e.getErrorClass == "MISSING_COLUMN")
+    assert(e.messageParameters.sameElements(Array("invalid", "value, key")))
   }
 
   test("numerical aggregate functions on string column") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 1e2353b..a46ef5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -320,23 +320,25 @@ class DatasetSuite extends QueryTest
     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "false") {
       var e = intercept[AnalysisException] {
         ds.select(expr("`(_1)?+.+`").as[Int])
-      }.getMessage
-      assert(e.contains("cannot resolve '`(_1)?+.+`'"))
+      }
+      assert(e.getErrorClass == "MISSING_COLUMN")
+      assert(e.messageParameters.head == "`(_1)?+.+`")
 
       e = intercept[AnalysisException] {
         ds.select(expr("`(_1|_2)`").as[Int])
-      }.getMessage
-      assert(e.contains("cannot resolve '`(_1|_2)`'"))
+      }
+      assert(e.getErrorClass == "MISSING_COLUMN")
+      assert(e.messageParameters.head == "`(_1|_2)`")
 
       e = intercept[AnalysisException] {
         ds.select(ds("`(_1)?+.+`"))
-      }.getMessage
-      assert(e.contains("Cannot resolve column name \"`(_1)?+.+`\""))
+      }
+      assert(e.getMessage.contains("Cannot resolve column name 
\"`(_1)?+.+`\""))
 
       e = intercept[AnalysisException] {
         ds.select(ds("`(_1|_2)`"))
-      }.getMessage
-      assert(e.contains("Cannot resolve column name \"`(_1|_2)`\""))
+      }
+      assert(e.getMessage.contains("Cannot resolve column name \"`(_1|_2)`\""))
     }
 
     withSQLConf(SQLConf.SUPPORT_QUOTED_REGEX_COLUMN_NAME.key -> "true") {
@@ -915,7 +917,8 @@ class DatasetSuite extends QueryTest
     val e = intercept[AnalysisException] {
       ds.as[ClassData2]
     }
-    assert(e.getMessage.contains("cannot resolve 'c' given input columns: [a, 
b]"), e.getMessage)
+    assert(e.getErrorClass == "MISSING_COLUMN")
+    assert(e.messageParameters.sameElements(Array("c", "a, b")))
   }
 
   test("runtime nullability check") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 7b2c0bb..3d5b911 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1098,7 +1098,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
             |order by struct.a, struct.b
             |""".stripMargin)
     }
-    assert(error.message contains "cannot resolve 'struct.a' given input 
columns: [a, b]")
+    assert(error.getErrorClass == "MISSING_COLUMN")
+    assert(error.messageParameters.sameElements(Array("struct.a", "a, b")))
 
   }
 
@@ -2700,8 +2701,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
       checkAnswer(sql("SELECT i from (SELECT i FROM v)"), Row(1))
 
       val e = intercept[AnalysisException](sql("SELECT v.i from (SELECT i FROM 
v)"))
-      assert(e.message ==
-        "cannot resolve 'v.i' given input columns: 
[__auto_generated_subquery_name.i]")
+      assert(e.getErrorClass == "MISSING_COLUMN")
+      assert(e.messageParameters.sameElements(Array("v.i", 
"__auto_generated_subquery_name.i")))
 
       checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i 
FROM v)"), Row(1))
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 6713a82..9e7ce55 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -896,7 +896,8 @@ class SubquerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     withTempView("t") {
       Seq(1 -> "a").toDF("i", "j").createOrReplaceTempView("t")
       val e = intercept[AnalysisException](sql("SELECT (SELECT count(*) FROM t 
WHERE a = 1)"))
-      assert(e.message.contains("cannot resolve 'a' given input columns: [t.i, 
t.j]"))
+      assert(e.getErrorClass == "MISSING_COLUMN")
+      assert(e.messageParameters.sameElements(Array("a", "t.i, t.j")))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 9d32d6f..d100cad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -730,7 +730,8 @@ class UDFSuite extends QueryTest with SharedSparkSession {
       .select(lit(50).as("a"))
       .select(struct("a").as("col"))
     val error = intercept[AnalysisException](df.select(myUdf(Column("col"))))
-    assert(error.getMessage.contains("cannot resolve 'b' given input columns: 
[a]"))
+    assert(error.getErrorClass == "MISSING_COLUMN")
+    assert(error.messageParameters.sameElements(Array("b", "a")))
   }
 
   test("wrong order of input fields for case class") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 2784354..a2e147c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -173,9 +173,10 @@ class DataSourceV2SQLSuite
         Row("data_type", "string"),
         Row("comment", "hello")))
 
-      assertAnalysisError(
+      assertAnalysisErrorClass(
         s"DESCRIBE $t invalid_col",
-        "cannot resolve 'invalid_col' given input columns: [testcat.tbl.data, 
testcat.tbl.id]")
+        "MISSING_COLUMN",
+        Array("invalid_col", "testcat.tbl.id, testcat.tbl.data"))
     }
   }
 
@@ -962,7 +963,8 @@ class DataSourceV2SQLSuite
       val ex = intercept[AnalysisException] {
         sql(s"SELECT ns1.ns2.ns3.tbl.id from $t")
       }
-      assert(ex.getMessage.contains("cannot resolve 'ns1.ns2.ns3.tbl.id"))
+      assert(ex.getErrorClass == "MISSING_COLUMN")
+      assert(ex.messageParameters.head == "ns1.ns2.ns3.tbl.id")
     }
   }
 
@@ -1805,12 +1807,20 @@ class DataSourceV2SQLSuite
         "Table or view not found")
 
       // UPDATE non-existing column
-      assertAnalysisError(
+      assertAnalysisErrorClass(
         s"UPDATE $t SET dummy='abc'",
-        "cannot resolve")
-      assertAnalysisError(
+        "MISSING_COLUMN",
+        Array(
+          "dummy",
+          "testcat.ns1.ns2.tbl.p, testcat.ns1.ns2.tbl.id, " +
+            "testcat.ns1.ns2.tbl.age, testcat.ns1.ns2.tbl.name"))
+      assertAnalysisErrorClass(
         s"UPDATE $t SET name='abc' WHERE dummy=1",
-        "cannot resolve")
+        "MISSING_COLUMN",
+        Array(
+          "dummy",
+          "testcat.ns1.ns2.tbl.p, testcat.ns1.ns2.tbl.id, " +
+            "testcat.ns1.ns2.tbl.age, testcat.ns1.ns2.tbl.name"))
 
       // UPDATE is not implemented yet.
       val e = intercept[UnsupportedOperationException] {
@@ -2939,11 +2949,24 @@ class DataSourceV2SQLSuite
     assert(e.message.contains(s"$sqlCommand is not supported for v2 tables"))
   }
 
-  private def assertAnalysisError(sqlStatement: String, expectedError: 
String): Unit = {
-    val errMsg = intercept[AnalysisException] {
+  private def assertAnalysisError(
+      sqlStatement: String,
+      expectedError: String): Unit = {
+    val ex = intercept[AnalysisException] {
       sql(sqlStatement)
-    }.getMessage
-    assert(errMsg.contains(expectedError))
+    }
+    assert(ex.getMessage.contains(expectedError))
+  }
+
+  private def assertAnalysisErrorClass(
+      sqlStatement: String,
+      expectedErrorClass: String,
+      expectedErrorMessageParameters: Array[String]): Unit = {
+    val ex = intercept[AnalysisException] {
+      sql(sqlStatement)
+    }
+    assert(ex.getErrorClass == expectedErrorClass)
+    assert(ex.messageParameters.sameElements(expectedErrorMessageParameters))
   }
 
   private def getShowCreateDDL(showCreateTableSql: String): Array[String] = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index b581287..cc46522 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -869,9 +869,9 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
           withSQLConf(CASE_SENSITIVE.key -> "true") {
             val e = intercept[AnalysisException] {
               sql("SELECT * FROM v1")
-            }.getMessage
-            assert(e.contains("cannot resolve 'C1' given input columns: " +
-              "[spark_catalog.default.t.c1]"))
+            }
+            assert(e.getErrorClass == "MISSING_COLUMN")
+            assert(e.messageParameters.sameElements(Array("C1", 
"spark_catalog.default.t.c1")))
           }
           withSQLConf(ORDER_BY_ORDINAL.key -> "false") {
             checkAnswer(sql("SELECT * FROM v2"), Seq(Row(3), Row(2), Row(1)))
@@ -888,9 +888,9 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
           withSQLConf(GROUP_BY_ALIASES.key -> "false") {
             val e = intercept[AnalysisException] {
               sql("SELECT * FROM v4")
-            }.getMessage
-            assert(e.contains("cannot resolve 'a' given input columns: " +
-              "[spark_catalog.default.t.c1]"))
+            }
+            assert(e.getErrorClass == "MISSING_COLUMN")
+            assert(e.messageParameters.sameElements(Array("a", 
"spark_catalog.default.t.c1")))
           }
           withSQLConf(ANSI_ENABLED.key -> "true") {
             val e = intercept[ArithmeticException] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 1651eb6..5b265e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -2409,10 +2409,11 @@ abstract class CSVSuite
               .option("header", true)
               .csv(path.getCanonicalPath)
             checkAnswer(readback, Seq(Row(2, 3), Row(0, 1)))
-            val errorMsg = intercept[AnalysisException] {
+            val ex = intercept[AnalysisException] {
               readback.filter($"AAA" === 2 && $"bbb" === 3).collect()
-            }.getMessage
-            assert(errorMsg.contains("cannot resolve 'AAA'"))
+            }
+            assert(ex.getErrorClass == "MISSING_COLUMN")
+            assert(ex.messageParameters.head == "AAA")
           }
         }
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index e4d1104..d5e0a3f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2811,10 +2811,11 @@ abstract class JsonSuite
             val readback = spark.read.schema("aaa integer, BBB integer")
               .json(path.getCanonicalPath)
             checkAnswer(readback, Seq(Row(null, null), Row(0, 1)))
-            val errorMsg = intercept[AnalysisException] {
+            val ex = intercept[AnalysisException] {
               readback.filter($"AAA" === 0 && $"bbb" === 1).collect()
-            }.getMessage
-            assert(errorMsg.contains("cannot resolve 'AAA'"))
+            }
+            assert(ex.getErrorClass == "MISSING_COLUMN")
+            assert(ex.messageParameters.head == "AAA")
             // Schema inferring
             val readback2 = spark.read.json(path.getCanonicalPath)
             checkAnswer(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index d3c2103..73fdf28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -919,7 +919,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
 
   test("SPARK-33294: Add query resolved check before analyze InsertIntoDir") {
     withTempPath { path =>
-      val msg = intercept[AnalysisException] {
+      val ex = intercept[AnalysisException] {
         sql(
           s"""
             |INSERT OVERWRITE DIRECTORY '${path.getAbsolutePath}' USING PARQUET
@@ -929,8 +929,9 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
             |  )
             |)
           """.stripMargin)
-      }.getMessage
-      assert(msg.contains("cannot resolve 'c3' given input columns"))
+      }
+      assert(ex.getErrorClass == "MISSING_COLUMN")
+      assert(ex.messageParameters.head == "c3")
     }
   }
 
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index ae7ca38..e058e6a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -112,7 +112,7 @@ class HiveParquetSuite extends QueryTest with ParquetTest 
with TestHiveSingleton
 
   test("SPARK-33323: Add query resolved check before convert hive relation") {
     withTable("t") {
-      val msg = intercept[AnalysisException] {
+      val ex = intercept[AnalysisException] {
         sql(
           s"""
              |CREATE TABLE t STORED AS PARQUET AS
@@ -122,8 +122,9 @@ class HiveParquetSuite extends QueryTest with ParquetTest 
with TestHiveSingleton
              |  )
              |)
           """.stripMargin)
-      }.getMessage
-      assert(msg.contains("cannot resolve 'c3' given input columns"))
+      }
+      assert(ex.getErrorClass == "MISSING_COLUMN")
+      assert(ex.messageParameters.head == "c3")
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to