This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bbcc438e5b3 [SPARK-43915][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2438-2445]
bbcc438e5b3 is described below

commit bbcc438e5b3aef67bf430b6bb6e4f893d8e66d13
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Wed Jun 21 21:20:01 2023 +0300

    [SPARK-43915][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[2438-2445]
    
    ### What changes were proposed in this pull request?
    The pr aims to assign names to the error class 
_LEGACY_ERROR_TEMP_[2438-2445].
    
    ### Why are the changes needed?
    Improve the error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    Exists test cases updated.
    
    Closes #41553 from beliefer/SPARK-43915.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   | 47 +++++++++-------------
 python/pyspark/sql/tests/test_udtf.py              |  8 +++-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  4 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 23 +++++------
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 28 ++++++++-----
 .../analyzer-results/group-analytics.sql.out       |  2 +-
 .../analyzer-results/join-lateral.sql.out          |  4 +-
 .../udf/udf-group-analytics.sql.out                |  2 +-
 .../sql-tests/results/group-analytics.sql.out      |  2 +-
 .../sql-tests/results/join-lateral.sql.out         |  4 +-
 .../results/udf/udf-group-analytics.sql.out        |  2 +-
 .../spark/sql/DataFrameSetOperationsSuite.scala    | 44 ++++++++++++++------
 .../sql/connector/DataSourceV2FunctionSuite.scala  | 13 +++++-
 .../sql/connector/DeleteFromTableSuiteBase.scala   | 15 +------
 .../connector/DeltaBasedDeleteFromTableSuite.scala | 20 +++++++++
 .../sql/connector/DeltaBasedUpdateTableSuite.scala | 21 ++++++++++
 .../connector/GroupBasedDeleteFromTableSuite.scala | 22 +++++++++-
 .../sql/connector/GroupBasedUpdateTableSuite.scala | 23 ++++++++++-
 .../spark/sql/connector/UpdateTableSuiteBase.scala | 15 +------
 19 files changed, 195 insertions(+), 104 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 1d2f25b72f3..264d9b7c3a0 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -643,6 +643,11 @@
     ],
     "sqlState" : "23505"
   },
+  "DUPLICATED_METRICS_NAME" : {
+    "message" : [
+      "The metric name is not unique: <metricName>. The same name cannot be 
used for metrics with different results. However multiple instances of metrics 
with with same result and name are allowed (e.g. self-joins)."
+    ]
+  },
   "DUPLICATE_CLAUSES" : {
     "message" : [
       "Found duplicate clauses: <clauseName>. Please, remove one of them."
@@ -1237,6 +1242,11 @@
       }
     }
   },
+  "INVALID_NON_DETERMINISTIC_EXPRESSIONS" : {
+    "message" : [
+      "The operator expects a deterministic expression, but the actual 
expression is <sqlExprs>."
+    ]
+  },
   "INVALID_NUMERIC_LITERAL_RANGE" : {
     "message" : [
       "Numeric literal <rawStrippedQualifier> is outside the valid range for 
<typeName> with minimum value of <minValue> and maximum value of <maxValue>. 
Please adjust the value accordingly."
@@ -1512,6 +1522,11 @@
     ],
     "sqlState" : "42604"
   },
+  "INVALID_UDF_IMPLEMENTATION" : {
+    "message" : [
+      "Function <funcName> does not implement ScalarFunction or 
AggregateFunction."
+    ]
+  },
   "INVALID_URL" : {
     "message" : [
       "The url is invalid: <url>. If necessary set <ansiConfig> to \"false\" 
to bypass this error."
@@ -2458,6 +2473,11 @@
           "<property> is a reserved namespace property, <msg>."
         ]
       },
+      "SET_OPERATION_ON_MAP_TYPE" : {
+        "message" : [
+          "Cannot have MAP type columns in DataFrame which calls set 
operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is 
<dataType>."
+        ]
+      },
       "SET_PROPERTIES_AND_DBPROPERTIES" : {
         "message" : [
           "set PROPERTIES and DBPROPERTIES at the same time."
@@ -5659,33 +5679,6 @@
       "Conflicting attributes: <conflictingAttributes>."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2438" : {
-    "message" : [
-      "Cannot have map type columns in DataFrame which calls set 
operations(intersect, except, etc.), but the type of column <colName> is 
<dataType>."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2439" : {
-    "message" : [
-      "nondeterministic expressions are only allowed in Project, Filter, 
Aggregate or Window, found:",
-      "<sqlExprs>",
-      "in operator <operator>."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2443" : {
-    "message" : [
-      "Multiple definitions of observed metrics named '<name>': <plan>."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2444" : {
-    "message" : [
-      "Function '<funcName>' does not implement ScalarFunction or 
AggregateFunction."
-    ]
-  },
-  "_LEGACY_ERROR_TEMP_2445" : {
-    "message" : [
-      "grouping() can only be used with GroupingSets/Cube/Rollup."
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2446" : {
     "message" : [
       "Operation not allowed: <cmd> only works on table with location 
provided: <tableIdentWithDB>"
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index 628f2696b84..ccf271ceec2 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -350,7 +350,9 @@ class UDTFTestsMixin(ReusedSQLTestCase):
 
         random_udtf = udtf(RandomUDTF, returnType="x: 
int").asNondeterministic()
         # TODO(SPARK-43966): support non-deterministic UDTFs
-        with self.assertRaisesRegex(AnalysisException, "nondeterministic 
expressions"):
+        with self.assertRaisesRegex(
+            AnalysisException, "The operator expects a deterministic 
expression"
+        ):
             random_udtf(lit(1)).collect()
 
     def test_udtf_with_nondeterministic_input(self):
@@ -362,7 +364,9 @@ class UDTFTestsMixin(ReusedSQLTestCase):
                 yield a + 1,
 
         # TODO(SPARK-43966): support non-deterministic UDTFs
-        with self.assertRaisesRegex(AnalysisException, "nondeterministic 
expressions"):
+        with self.assertRaisesRegex(
+            AnalysisException, " The operator expects a deterministic 
expression"
+        ):
             TestUDTF(rand(0) * 100).collect()
 
     def test_udtf_no_eval(self):
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3f07f0f5032..8a192a4c132 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2326,8 +2326,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
           processV2AggregateFunction(aggFunc, arguments, u)
         case _ =>
           failAnalysis(
-            errorClass = "_LEGACY_ERROR_TEMP_2444",
-            messageParameters = Map("funcName" -> bound.name()))
+            errorClass = "INVALID_UDF_IMPLEMENTATION",
+            messageParameters = Map("funcName" -> toSQLId(bound.name())))
       }
     }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index e47966f1e27..649140e466a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -302,7 +302,8 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
               "\nReplacement is unresolved: " + e.replacement)
 
           case g: Grouping =>
-            g.failAnalysis(errorClass = "_LEGACY_ERROR_TEMP_2445", 
messageParameters = Map.empty)
+            g.failAnalysis(
+              errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", 
messageParameters = Map.empty)
           case g: GroupingID =>
             g.failAnalysis(
               errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", 
messageParameters = Map.empty)
@@ -721,10 +722,10 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           case o if mapColumnInSetOperation(o).isDefined =>
             val mapCol = mapColumnInSetOperation(o).get
             o.failAnalysis(
-              errorClass = "_LEGACY_ERROR_TEMP_2438",
+              errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
               messageParameters = Map(
-                "colName" -> mapCol.name,
-                "dataType" -> mapCol.dataType.catalogString))
+                "colName" -> toSQLId(mapCol.name),
+                "dataType" -> toSQLType(mapCol.dataType)))
 
           case o if o.expressions.exists(!_.deterministic) &&
             !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
@@ -734,10 +735,9 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
             !o.isInstanceOf[LateralJoin] =>
             // The rule above is used to check Aggregate operator.
             o.failAnalysis(
-              errorClass = "_LEGACY_ERROR_TEMP_2439",
-              messageParameters = Map(
-                "sqlExprs" -> o.expressions.map(_.sql).mkString(","),
-                "operator" -> 
operator.simpleString(SQLConf.get.maxToStringFields)))
+              errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS",
+              messageParameters = Map("sqlExprs" -> 
o.expressions.map(toSQLExpr(_)).mkString(", "))
+            )
 
           case _: UnresolvedHint => throw new IllegalStateException(
             "Logical hint operator should be removed during analysis.")
@@ -868,6 +868,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   private def scrubOutIds(string: String): String =
     string.replaceAll("#\\d+", "#x")
       .replaceAll("operator id = \\d+", "operator id = #x")
+      .replaceAll("rand\\(-?\\d+\\)", "rand(number)")
 
   private def planToString(plan: LogicalPlan): String = {
     if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString
@@ -1056,10 +1057,8 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
               // of a CTE that is used multiple times or a self join.
               if (!simplifiedMetrics.sameResult(simplifiedOther)) {
                 failAnalysis(
-                  errorClass = "_LEGACY_ERROR_TEMP_2443",
-                  messageParameters = Map(
-                    "name" -> name,
-                    "plan" -> plan.toString))
+                  errorClass = "DUPLICATED_METRICS_NAME",
+                  messageParameters = Map("metricName" -> name))
               }
             case None =>
               metricsMap.put(name, metrics)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 105753ab3d1..1e844e22bec 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -821,25 +821,35 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       CollectMetrics("evt1", count :: Nil, testRelation) :: Nil))
 
     // Same children, structurally different metrics - fail
-    assertAnalysisError(Union(
-      CollectMetrics("evt1", count :: Nil, testRelation) ::
-      CollectMetrics("evt1", sum :: Nil, testRelation) :: Nil),
-      "Multiple definitions of observed metrics" :: "evt1" :: Nil)
+    assertAnalysisErrorClass(
+      Union(
+        CollectMetrics("evt1", count :: Nil, testRelation) ::
+          CollectMetrics("evt1", sum :: Nil, testRelation) :: Nil),
+      expectedErrorClass = "DUPLICATED_METRICS_NAME",
+      expectedMessageParameters = Map("metricName" -> "evt1")
+    )
 
     // Different children, same metrics - fail
     val b = $"b".string
     val tblB = LocalRelation(b)
-    assertAnalysisError(Union(
-      CollectMetrics("evt1", count :: Nil, testRelation) ::
-      CollectMetrics("evt1", count :: Nil, tblB) :: Nil),
-      "Multiple definitions of observed metrics" :: "evt1" :: Nil)
+    assertAnalysisErrorClass(
+      Union(
+        CollectMetrics("evt1", count :: Nil, testRelation) ::
+          CollectMetrics("evt1", count :: Nil, tblB) :: Nil),
+      expectedErrorClass = "DUPLICATED_METRICS_NAME",
+      expectedMessageParameters = Map("metricName" -> "evt1")
+    )
 
     // Subquery different tree - fail
     val subquery = Aggregate(Nil, sum :: Nil, CollectMetrics("evt1", count :: 
Nil, testRelation))
     val query = Project(
       b :: ScalarSubquery(subquery, Nil).as("sum") :: Nil,
       CollectMetrics("evt1", count :: Nil, tblB))
-    assertAnalysisError(query, "Multiple definitions of observed metrics" :: 
"evt1" :: Nil)
+    assertAnalysisErrorClass(
+      query,
+      expectedErrorClass = "DUPLICATED_METRICS_NAME",
+      expectedMessageParameters = Map("metricName" -> "evt1")
+    )
 
     // Aggregate with filter predicate - fail
     val sumWithFilter = sum.transform {
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
index 773965d76b9..327caef217a 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out
@@ -332,7 +332,7 @@ SELECT course, year, GROUPING(course) FROM courseSales 
GROUP BY course, year
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2445",
+  "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION",
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
index 74c25e11bd9..a9bfbc69cf2 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
@@ -480,7 +480,7 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES",
   "sqlState" : "0A000",
   "messageParameters" : {
-    "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project 
[(cast((outer(c1#x) + outer(c2#x)) as double) + rand(0)) AS c3#x]\n:        +- 
OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n   +- View 
(`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n      +- Project [cast(col1#x as 
int) AS c1#x, cast(col2#x as int) AS c2#x]\n         +- LocalRelation [col1#x, 
col2#x]\n"
+    "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project 
[(cast((outer(c1#x) + outer(c2#x)) as double) + rand(number)) AS c3#x]\n:       
 +- OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n   +- View 
(`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n      +- Project [cast(col1#x as 
int) AS c1#x, cast(col2#x as int) AS c2#x]\n         +- LocalRelation [col1#x, 
col2#x]\n"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -500,7 +500,7 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES",
   "sqlState" : "0A000",
   "messageParameters" : {
-    "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project [rand(0) AS 
rand(0)#x]\n:        +- SubqueryAlias spark_catalog.default.t2\n:           +- 
View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n:              +- Project 
[cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n:                 
+- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias spark_catalog.default.t1\n  
 +- View (`spark_catalog`.`default`. [...]
+    "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project [rand(number) AS 
rand(number)#x]\n:        +- SubqueryAlias spark_catalog.default.t2\n:          
 +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n:              +- 
Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n:           
      +- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias 
spark_catalog.default.t1\n   +- View (`spark_catalog`. [...]
   },
   "queryContext" : [ {
     "objectType" : "",
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
index d8ed67d3002..8a9c142bb87 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out
@@ -205,7 +205,7 @@ SELECT course, udf(year), GROUPING(course) FROM courseSales 
GROUP BY course, udf
 -- !query analysis
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2445",
+  "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION",
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
diff --git 
a/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out 
b/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out
index accc141d811..d9e1a25b574 100644
--- a/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out
@@ -466,7 +466,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2445",
+  "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION",
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index cc7b44ca8e8..ddee595372b 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -360,7 +360,7 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES",
   "sqlState" : "0A000",
   "messageParameters" : {
-    "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project 
[(cast((outer(c1#x) + outer(c2#x)) as double) + rand(0)) AS c3#x]\n:        +- 
OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n   +- View 
(`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n      +- Project [cast(col1#x as 
int) AS c1#x, cast(col2#x as int) AS c2#x]\n         +- LocalRelation [col1#x, 
col2#x]\n"
+    "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project 
[(cast((outer(c1#x) + outer(c2#x)) as double) + rand(number)) AS c3#x]\n:       
 +- OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n   +- View 
(`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n      +- Project [cast(col1#x as 
int) AS c1#x, cast(col2#x as int) AS c2#x]\n         +- LocalRelation [col1#x, 
col2#x]\n"
   },
   "queryContext" : [ {
     "objectType" : "",
@@ -382,7 +382,7 @@ org.apache.spark.sql.AnalysisException
   "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES",
   "sqlState" : "0A000",
   "messageParameters" : {
-    "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project [rand(0) AS 
rand(0)#x]\n:        +- SubqueryAlias spark_catalog.default.t2\n:           +- 
View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n:              +- Project 
[cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n:                 
+- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias spark_catalog.default.t1\n  
 +- View (`spark_catalog`.`default`. [...]
+    "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n:  +- 
SubqueryAlias __auto_generated_subquery_name\n:     +- Project [rand(number) AS 
rand(number)#x]\n:        +- SubqueryAlias spark_catalog.default.t2\n:          
 +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n:              +- 
Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n:           
      +- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias 
spark_catalog.default.t1\n   +- View (`spark_catalog`. [...]
   },
   "queryContext" : [ {
     "objectType" : "",
diff --git 
a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
index 1d1b54fc707..f927bf03663 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out
@@ -208,7 +208,7 @@ struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
 {
-  "errorClass" : "_LEGACY_ERROR_TEMP_2445",
+  "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION",
   "queryContext" : [ {
     "objectType" : "",
     "objectName" : "",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 61724a39dfa..7e04053957b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -352,20 +352,40 @@ class DataFrameSetOperationsSuite extends QueryTest with 
SharedSparkSession {
 
   test("SPARK-19893: cannot run set operations with map type") {
     val df = spark.range(1).select(map(lit("key"), $"id").as("m"))
-    val e = intercept[AnalysisException](df.intersect(df))
-    assert(e.message.contains(
-      "Cannot have map type columns in DataFrame which calls set operations"))
-    val e2 = intercept[AnalysisException](df.except(df))
-    assert(e2.message.contains(
-      "Cannot have map type columns in DataFrame which calls set operations"))
-    val e3 = intercept[AnalysisException](df.distinct())
-    assert(e3.message.contains(
-      "Cannot have map type columns in DataFrame which calls set operations"))
+    checkError(
+      exception = intercept[AnalysisException](df.intersect(df)),
+      errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
+      parameters = Map(
+        "colName" -> "`m`",
+        "dataType" -> "\"MAP<STRING, BIGINT>\"")
+    )
+    checkError(
+      exception = intercept[AnalysisException](df.except(df)),
+      errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
+      parameters = Map(
+        "colName" -> "`m`",
+        "dataType" -> "\"MAP<STRING, BIGINT>\"")
+    )
+    checkError(
+      exception = intercept[AnalysisException](df.distinct()),
+      errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
+      parameters = Map(
+        "colName" -> "`m`",
+        "dataType" -> "\"MAP<STRING, BIGINT>\"")
+    )
     withTempView("v") {
       df.createOrReplaceTempView("v")
-      val e4 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v"))
-      assert(e4.message.contains(
-        "Cannot have map type columns in DataFrame which calls set 
operations"))
+      checkError(
+        exception = intercept[AnalysisException](sql("SELECT DISTINCT m FROM 
v")),
+        errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE",
+        parameters = Map(
+          "colName" -> "`m`",
+          "dataType" -> "\"MAP<STRING, BIGINT>\""),
+        context = ExpectedContext(
+          fragment = "SELECT DISTINCT m FROM v",
+          start = 0,
+          stop = 23)
+      )
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
index 9a31948889c..eea2eebf849 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala
@@ -446,8 +446,17 @@ class DataSourceV2FunctionSuite extends 
DatasourceV2SQLBase {
     
catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"),
 emptyProps)
     addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(BadBoundFunction))
 
-    assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')"))
-      .getMessage.contains("does not implement ScalarFunction or 
AggregateFunction"))
+    checkError(
+      exception = intercept[AnalysisException](
+        sql("SELECT testcat.ns.strlen('abc')")),
+      errorClass = "INVALID_UDF_IMPLEMENTATION",
+      parameters = Map(
+        "funcName" -> "`bad_bound_func`"),
+      context = ExpectedContext(
+        fragment = "testcat.ns.strlen('abc')",
+        start = 7,
+        stop = 30)
+    )
   }
 
   test("aggregate function: lookup int average") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
index 11e22a744f3..94109681b8e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.connector
 
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, 
ReplaceDataExec, WriteDeltaExec}
 
 abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase {
@@ -449,19 +449,6 @@ abstract class DeleteFromTableSuiteBase extends 
RowLevelOperationSuiteBase {
     }
   }
 
-  test("delete with nondeterministic conditions") {
-    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
-      """{ "pk": 1, "id": 1, "dep": "hr" }
-        |{ "pk": 2, "id": 2, "dep": "software" }
-        |{ "pk": 3, "id": 3, "dep": "hr" }
-        |""".stripMargin)
-
-    val e = intercept[AnalysisException] {
-      sql(s"DELETE FROM $tableNameAsString WHERE id <= 1 AND rand() > 0.5")
-    }
-    assert(e.message.contains("nondeterministic expressions are only allowed"))
-  }
-
   test("delete without condition executed as delete with filters") {
     createAndInitTable("pk INT NOT NULL, id INT, dep INT",
       """{ "pk": 1, "id": 1, "dep": 100 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
index fd7a04ea926..4da85a5ce05 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala
@@ -27,6 +27,26 @@ class DeltaBasedDeleteFromTableSuite extends 
DeleteFromTableSuiteBase {
     props
   }
 
+  test("delete with nondeterministic conditions") {
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
+        |""".stripMargin)
+
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"DELETE FROM $tableNameAsString WHERE id <= 1 AND rand() > 0.5")),
+      errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS",
+      parameters = Map(
+        "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\""),
+      context = ExpectedContext(
+        fragment = "DELETE FROM cat.ns1.test_table WHERE id <= 1 AND rand() > 
0.5",
+        start = 0,
+        stop = 60)
+    )
+  }
+
   test("nullable row ID attrs") {
     createAndInitTable("pk INT, salary INT, dep STRING",
       """{ "pk": 1, "salary": 300, "dep": 'hr' }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala
index af78b2884f5..73b6ec22a72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala
@@ -53,4 +53,25 @@ class DeltaBasedUpdateTableSuite extends 
UpdateTableSuiteBase {
       sql(s"SELECT * FROM $tableNameAsString"),
       Row(10, 1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil)
   }
+
+  test("update with nondeterministic conditions") {
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
+        |""".stripMargin)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE id <= 1 AND 
rand() > 0.5")
+      },
+      errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS",
+      parameters = Map(
+        "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\""),
+      context = ExpectedContext(
+        fragment = "UPDATE cat.ns1.test_table SET dep = 'invalid' WHERE id <= 
1 AND rand() > 0.5",
+        start = 0,
+        stop = 75)
+    )
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala
index 6c80d46b0ef..0aeab95f58a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala
@@ -17,13 +17,33 @@
 
 package org.apache.spark.sql.connector
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.internal.SQLConf
 
 class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase {
 
   import testImplicits._
 
+  test("delete with nondeterministic conditions") {
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
+        |""".stripMargin)
+
+    checkError(
+      exception = intercept[AnalysisException](
+        sql(s"DELETE FROM $tableNameAsString WHERE id <= 1 AND rand() > 0.5")),
+      errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS",
+      parameters = Map(
+        "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\", \"((id <= 1) AND 
(rand() > 0.5))\""),
+      context = ExpectedContext(
+        fragment = "DELETE FROM cat.ns1.test_table WHERE id <= 1 AND rand() > 
0.5",
+        start = 0,
+        stop = 60)
+    )
+  }
+
   test("delete with IN predicate and runtime group filtering") {
     createAndInitTable("id INT, salary INT, dep STRING",
       """{ "id": 1, "salary": 300, "dep": 'hr' }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala
index e2f73e51de0..3e736421a31 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.connector
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression
 import org.apache.spark.sql.execution.{InSubqueryExec, ReusedSubqueryExec}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -110,4 +110,25 @@ class GroupBasedUpdateTableSuite extends 
UpdateTableSuiteBase {
       }
     }
   }
+
+  test("update with nondeterministic conditions") {
+    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
+      """{ "pk": 1, "id": 1, "dep": "hr" }
+        |{ "pk": 2, "id": 2, "dep": "software" }
+        |{ "pk": 3, "id": 3, "dep": "hr" }
+        |""".stripMargin)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE id <= 1 AND 
rand() > 0.5")
+      },
+      errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS",
+      parameters = Map(
+        "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\", \"((id <= 1) AND 
(rand() > 0.5))\""),
+      context = ExpectedContext(
+        fragment = "UPDATE cat.ns1.test_table SET dep = 'invalid' WHERE id <= 
1 AND rand() > 0.5",
+        start = 0,
+        stop = 75)
+    )
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
index 346390efa94..b43101c2e02 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue}
 import org.apache.spark.sql.connector.expressions.LiteralValue
 import org.apache.spark.sql.types.{IntegerType, StringType}
@@ -528,19 +528,6 @@ abstract class UpdateTableSuiteBase extends 
RowLevelOperationSuiteBase {
       Row(2) :: Nil)
   }
 
-  test("update with nondeterministic conditions") {
-    createAndInitTable("pk INT NOT NULL, id INT, dep STRING",
-      """{ "pk": 1, "id": 1, "dep": "hr" }
-        |{ "pk": 2, "id": 2, "dep": "software" }
-        |{ "pk": 3, "id": 3, "dep": "hr" }
-        |""".stripMargin)
-
-    val e = intercept[AnalysisException] {
-      sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE id <= 1 AND 
rand() > 0.5")
-    }
-    assert(e.message.contains("nondeterministic expressions are only allowed"))
-  }
-
   test("update with default values") {
     val idDefault = new ColumnDefaultValue("42", LiteralValue(42, IntegerType))
     val columns = Array(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to