This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bbcc438e5b3 [SPARK-43915][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445] bbcc438e5b3 is described below commit bbcc438e5b3aef67bf430b6bb6e4f893d8e66d13 Author: Jiaan Geng <belie...@163.com> AuthorDate: Wed Jun 21 21:20:01 2023 +0300 [SPARK-43915][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445] ### What changes were proposed in this pull request? The pr aims to assign names to the error class _LEGACY_ERROR_TEMP_[2438-2445]. ### Why are the changes needed? Improve the error framework. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Exists test cases updated. Closes #41553 from beliefer/SPARK-43915. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 47 +++++++++------------- python/pyspark/sql/tests/test_udtf.py | 8 +++- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 23 +++++------ .../sql/catalyst/analysis/AnalysisSuite.scala | 28 ++++++++----- .../analyzer-results/group-analytics.sql.out | 2 +- .../analyzer-results/join-lateral.sql.out | 4 +- .../udf/udf-group-analytics.sql.out | 2 +- .../sql-tests/results/group-analytics.sql.out | 2 +- .../sql-tests/results/join-lateral.sql.out | 4 +- .../results/udf/udf-group-analytics.sql.out | 2 +- .../spark/sql/DataFrameSetOperationsSuite.scala | 44 ++++++++++++++------ .../sql/connector/DataSourceV2FunctionSuite.scala | 13 +++++- .../sql/connector/DeleteFromTableSuiteBase.scala | 15 +------ .../connector/DeltaBasedDeleteFromTableSuite.scala | 20 +++++++++ .../sql/connector/DeltaBasedUpdateTableSuite.scala | 21 ++++++++++ .../connector/GroupBasedDeleteFromTableSuite.scala | 22 +++++++++- .../sql/connector/GroupBasedUpdateTableSuite.scala | 23 ++++++++++- .../spark/sql/connector/UpdateTableSuiteBase.scala | 15 +------ 19 files changed, 195 insertions(+), 104 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 1d2f25b72f3..264d9b7c3a0 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -643,6 +643,11 @@ ], "sqlState" : "23505" }, + "DUPLICATED_METRICS_NAME" : { + "message" : [ + "The metric name is not unique: <metricName>. The same name cannot be used for metrics with different results. However multiple instances of metrics with with same result and name are allowed (e.g. self-joins)." + ] + }, "DUPLICATE_CLAUSES" : { "message" : [ "Found duplicate clauses: <clauseName>. Please, remove one of them." @@ -1237,6 +1242,11 @@ } } }, + "INVALID_NON_DETERMINISTIC_EXPRESSIONS" : { + "message" : [ + "The operator expects a deterministic expression, but the actual expression is <sqlExprs>." + ] + }, "INVALID_NUMERIC_LITERAL_RANGE" : { "message" : [ "Numeric literal <rawStrippedQualifier> is outside the valid range for <typeName> with minimum value of <minValue> and maximum value of <maxValue>. Please adjust the value accordingly." @@ -1512,6 +1522,11 @@ ], "sqlState" : "42604" }, + "INVALID_UDF_IMPLEMENTATION" : { + "message" : [ + "Function <funcName> does not implement ScalarFunction or AggregateFunction." + ] + }, "INVALID_URL" : { "message" : [ "The url is invalid: <url>. If necessary set <ansiConfig> to \"false\" to bypass this error." @@ -2458,6 +2473,11 @@ "<property> is a reserved namespace property, <msg>." ] }, + "SET_OPERATION_ON_MAP_TYPE" : { + "message" : [ + "Cannot have MAP type columns in DataFrame which calls set operations (INTERSECT, EXCEPT, etc.), but the type of column <colName> is <dataType>." + ] + }, "SET_PROPERTIES_AND_DBPROPERTIES" : { "message" : [ "set PROPERTIES and DBPROPERTIES at the same time." @@ -5659,33 +5679,6 @@ "Conflicting attributes: <conflictingAttributes>." ] }, - "_LEGACY_ERROR_TEMP_2438" : { - "message" : [ - "Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.), but the type of column <colName> is <dataType>." - ] - }, - "_LEGACY_ERROR_TEMP_2439" : { - "message" : [ - "nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found:", - "<sqlExprs>", - "in operator <operator>." - ] - }, - "_LEGACY_ERROR_TEMP_2443" : { - "message" : [ - "Multiple definitions of observed metrics named '<name>': <plan>." - ] - }, - "_LEGACY_ERROR_TEMP_2444" : { - "message" : [ - "Function '<funcName>' does not implement ScalarFunction or AggregateFunction." - ] - }, - "_LEGACY_ERROR_TEMP_2445" : { - "message" : [ - "grouping() can only be used with GroupingSets/Cube/Rollup." - ] - }, "_LEGACY_ERROR_TEMP_2446" : { "message" : [ "Operation not allowed: <cmd> only works on table with location provided: <tableIdentWithDB>" diff --git a/python/pyspark/sql/tests/test_udtf.py b/python/pyspark/sql/tests/test_udtf.py index 628f2696b84..ccf271ceec2 100644 --- a/python/pyspark/sql/tests/test_udtf.py +++ b/python/pyspark/sql/tests/test_udtf.py @@ -350,7 +350,9 @@ class UDTFTestsMixin(ReusedSQLTestCase): random_udtf = udtf(RandomUDTF, returnType="x: int").asNondeterministic() # TODO(SPARK-43966): support non-deterministic UDTFs - with self.assertRaisesRegex(AnalysisException, "nondeterministic expressions"): + with self.assertRaisesRegex( + AnalysisException, "The operator expects a deterministic expression" + ): random_udtf(lit(1)).collect() def test_udtf_with_nondeterministic_input(self): @@ -362,7 +364,9 @@ class UDTFTestsMixin(ReusedSQLTestCase): yield a + 1, # TODO(SPARK-43966): support non-deterministic UDTFs - with self.assertRaisesRegex(AnalysisException, "nondeterministic expressions"): + with self.assertRaisesRegex( + AnalysisException, " The operator expects a deterministic expression" + ): TestUDTF(rand(0) * 100).collect() def test_udtf_no_eval(self): diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3f07f0f5032..8a192a4c132 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2326,8 +2326,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor processV2AggregateFunction(aggFunc, arguments, u) case _ => failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2444", - messageParameters = Map("funcName" -> bound.name())) + errorClass = "INVALID_UDF_IMPLEMENTATION", + messageParameters = Map("funcName" -> toSQLId(bound.name()))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e47966f1e27..649140e466a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -302,7 +302,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB "\nReplacement is unresolved: " + e.replacement) case g: Grouping => - g.failAnalysis(errorClass = "_LEGACY_ERROR_TEMP_2445", messageParameters = Map.empty) + g.failAnalysis( + errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", messageParameters = Map.empty) case g: GroupingID => g.failAnalysis( errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", messageParameters = Map.empty) @@ -721,10 +722,10 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case o if mapColumnInSetOperation(o).isDefined => val mapCol = mapColumnInSetOperation(o).get o.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2438", + errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", messageParameters = Map( - "colName" -> mapCol.name, - "dataType" -> mapCol.dataType.catalogString)) + "colName" -> toSQLId(mapCol.name), + "dataType" -> toSQLType(mapCol.dataType))) case o if o.expressions.exists(!_.deterministic) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && @@ -734,10 +735,9 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB !o.isInstanceOf[LateralJoin] => // The rule above is used to check Aggregate operator. o.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2439", - messageParameters = Map( - "sqlExprs" -> o.expressions.map(_.sql).mkString(","), - "operator" -> operator.simpleString(SQLConf.get.maxToStringFields))) + errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", + messageParameters = Map("sqlExprs" -> o.expressions.map(toSQLExpr(_)).mkString(", ")) + ) case _: UnresolvedHint => throw new IllegalStateException( "Logical hint operator should be removed during analysis.") @@ -868,6 +868,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB private def scrubOutIds(string: String): String = string.replaceAll("#\\d+", "#x") .replaceAll("operator id = \\d+", "operator id = #x") + .replaceAll("rand\\(-?\\d+\\)", "rand(number)") private def planToString(plan: LogicalPlan): String = { if (Utils.isTesting) scrubOutIds(plan.toString) else plan.toString @@ -1056,10 +1057,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB // of a CTE that is used multiple times or a self join. if (!simplifiedMetrics.sameResult(simplifiedOther)) { failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2443", - messageParameters = Map( - "name" -> name, - "plan" -> plan.toString)) + errorClass = "DUPLICATED_METRICS_NAME", + messageParameters = Map("metricName" -> name)) } case None => metricsMap.put(name, metrics) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 105753ab3d1..1e844e22bec 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -821,25 +821,35 @@ class AnalysisSuite extends AnalysisTest with Matchers { CollectMetrics("evt1", count :: Nil, testRelation) :: Nil)) // Same children, structurally different metrics - fail - assertAnalysisError(Union( - CollectMetrics("evt1", count :: Nil, testRelation) :: - CollectMetrics("evt1", sum :: Nil, testRelation) :: Nil), - "Multiple definitions of observed metrics" :: "evt1" :: Nil) + assertAnalysisErrorClass( + Union( + CollectMetrics("evt1", count :: Nil, testRelation) :: + CollectMetrics("evt1", sum :: Nil, testRelation) :: Nil), + expectedErrorClass = "DUPLICATED_METRICS_NAME", + expectedMessageParameters = Map("metricName" -> "evt1") + ) // Different children, same metrics - fail val b = $"b".string val tblB = LocalRelation(b) - assertAnalysisError(Union( - CollectMetrics("evt1", count :: Nil, testRelation) :: - CollectMetrics("evt1", count :: Nil, tblB) :: Nil), - "Multiple definitions of observed metrics" :: "evt1" :: Nil) + assertAnalysisErrorClass( + Union( + CollectMetrics("evt1", count :: Nil, testRelation) :: + CollectMetrics("evt1", count :: Nil, tblB) :: Nil), + expectedErrorClass = "DUPLICATED_METRICS_NAME", + expectedMessageParameters = Map("metricName" -> "evt1") + ) // Subquery different tree - fail val subquery = Aggregate(Nil, sum :: Nil, CollectMetrics("evt1", count :: Nil, testRelation)) val query = Project( b :: ScalarSubquery(subquery, Nil).as("sum") :: Nil, CollectMetrics("evt1", count :: Nil, tblB)) - assertAnalysisError(query, "Multiple definitions of observed metrics" :: "evt1" :: Nil) + assertAnalysisErrorClass( + query, + expectedErrorClass = "DUPLICATED_METRICS_NAME", + expectedMessageParameters = Map("metricName" -> "evt1") + ) // Aggregate with filter predicate - fail val sumWithFilter = sum.transform { diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out index 773965d76b9..327caef217a 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/group-analytics.sql.out @@ -332,7 +332,7 @@ SELECT course, year, GROUPING(course) FROM courseSales GROUP BY course, year -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_2445", + "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION", "queryContext" : [ { "objectType" : "", "objectName" : "", diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out index 74c25e11bd9..a9bfbc69cf2 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out @@ -480,7 +480,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES", "sqlState" : "0A000", "messageParameters" : { - "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [(cast((outer(c1#x) + outer(c2#x)) as double) + rand(0)) AS c3#x]\n: +- OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" + "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [(cast((outer(c1#x) + outer(c2#x)) as double) + rand(number)) AS c3#x]\n: +- OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" }, "queryContext" : [ { "objectType" : "", @@ -500,7 +500,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES", "sqlState" : "0A000", "messageParameters" : { - "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [rand(0) AS rand(0)#x]\n: +- SubqueryAlias spark_catalog.default.t2\n: +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n: +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n: +- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`.`default`. [...] + "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [rand(number) AS rand(number)#x]\n: +- SubqueryAlias spark_catalog.default.t2\n: +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n: +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n: +- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`. [...] }, "queryContext" : [ { "objectType" : "", diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out index d8ed67d3002..8a9c142bb87 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/udf/udf-group-analytics.sql.out @@ -205,7 +205,7 @@ SELECT course, udf(year), GROUPING(course) FROM courseSales GROUP BY course, udf -- !query analysis org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_2445", + "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION", "queryContext" : [ { "objectType" : "", "objectName" : "", diff --git a/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out index accc141d811..d9e1a25b574 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-analytics.sql.out @@ -466,7 +466,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_2445", + "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION", "queryContext" : [ { "objectType" : "", "objectName" : "", diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out index cc7b44ca8e8..ddee595372b 100644 --- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out @@ -360,7 +360,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES", "sqlState" : "0A000", "messageParameters" : { - "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [(cast((outer(c1#x) + outer(c2#x)) as double) + rand(0)) AS c3#x]\n: +- OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" + "treeNode" : "LateralJoin lateral-subquery#x [c1#x && c2#x], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [(cast((outer(c1#x) + outer(c2#x)) as double) + rand(number)) AS c3#x]\n: +- OneRowRelation\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])\n +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n +- LocalRelation [col1#x, col2#x]\n" }, "queryContext" : [ { "objectType" : "", @@ -382,7 +382,7 @@ org.apache.spark.sql.AnalysisException "errorClass" : "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.NON_DETERMINISTIC_LATERAL_SUBQUERIES", "sqlState" : "0A000", "messageParameters" : { - "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [rand(0) AS rand(0)#x]\n: +- SubqueryAlias spark_catalog.default.t2\n: +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n: +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n: +- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`.`default`. [...] + "treeNode" : "LateralJoin lateral-subquery#x [], Inner\n: +- SubqueryAlias __auto_generated_subquery_name\n: +- Project [rand(number) AS rand(number)#x]\n: +- SubqueryAlias spark_catalog.default.t2\n: +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])\n: +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]\n: +- LocalRelation [col1#x, col2#x]\n+- SubqueryAlias spark_catalog.default.t1\n +- View (`spark_catalog`. [...] }, "queryContext" : [ { "objectType" : "", diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out index 1d1b54fc707..f927bf03663 100644 --- a/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-group-analytics.sql.out @@ -208,7 +208,7 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException { - "errorClass" : "_LEGACY_ERROR_TEMP_2445", + "errorClass" : "UNSUPPORTED_GROUPING_EXPRESSION", "queryContext" : [ { "objectType" : "", "objectName" : "", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala index 61724a39dfa..7e04053957b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala @@ -352,20 +352,40 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession { test("SPARK-19893: cannot run set operations with map type") { val df = spark.range(1).select(map(lit("key"), $"id").as("m")) - val e = intercept[AnalysisException](df.intersect(df)) - assert(e.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) - val e2 = intercept[AnalysisException](df.except(df)) - assert(e2.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) - val e3 = intercept[AnalysisException](df.distinct()) - assert(e3.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) + checkError( + exception = intercept[AnalysisException](df.intersect(df)), + errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", + parameters = Map( + "colName" -> "`m`", + "dataType" -> "\"MAP<STRING, BIGINT>\"") + ) + checkError( + exception = intercept[AnalysisException](df.except(df)), + errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", + parameters = Map( + "colName" -> "`m`", + "dataType" -> "\"MAP<STRING, BIGINT>\"") + ) + checkError( + exception = intercept[AnalysisException](df.distinct()), + errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", + parameters = Map( + "colName" -> "`m`", + "dataType" -> "\"MAP<STRING, BIGINT>\"") + ) withTempView("v") { df.createOrReplaceTempView("v") - val e4 = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v")) - assert(e4.message.contains( - "Cannot have map type columns in DataFrame which calls set operations")) + checkError( + exception = intercept[AnalysisException](sql("SELECT DISTINCT m FROM v")), + errorClass = "UNSUPPORTED_FEATURE.SET_OPERATION_ON_MAP_TYPE", + parameters = Map( + "colName" -> "`m`", + "dataType" -> "\"MAP<STRING, BIGINT>\""), + context = ExpectedContext( + fragment = "SELECT DISTINCT m FROM v", + start = 0, + stop = 23) + ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala index 9a31948889c..eea2eebf849 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2FunctionSuite.scala @@ -446,8 +446,17 @@ class DataSourceV2FunctionSuite extends DatasourceV2SQLBase { catalog("testcat").asInstanceOf[SupportsNamespaces].createNamespace(Array("ns"), emptyProps) addFunction(Identifier.of(Array("ns"), "strlen"), StrLen(BadBoundFunction)) - assert(intercept[AnalysisException](sql("SELECT testcat.ns.strlen('abc')")) - .getMessage.contains("does not implement ScalarFunction or AggregateFunction")) + checkError( + exception = intercept[AnalysisException]( + sql("SELECT testcat.ns.strlen('abc')")), + errorClass = "INVALID_UDF_IMPLEMENTATION", + parameters = Map( + "funcName" -> "`bad_bound_func`"), + context = ExpectedContext( + fragment = "testcat.ns.strlen('abc')", + start = 7, + stop = 30) + ) } test("aggregate function: lookup int average") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala index 11e22a744f3..94109681b8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeleteFromTableSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.Row import org.apache.spark.sql.execution.datasources.v2.{DeleteFromTableExec, ReplaceDataExec, WriteDeltaExec} abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { @@ -449,19 +449,6 @@ abstract class DeleteFromTableSuiteBase extends RowLevelOperationSuiteBase { } } - test("delete with nondeterministic conditions") { - createAndInitTable("pk INT NOT NULL, id INT, dep STRING", - """{ "pk": 1, "id": 1, "dep": "hr" } - |{ "pk": 2, "id": 2, "dep": "software" } - |{ "pk": 3, "id": 3, "dep": "hr" } - |""".stripMargin) - - val e = intercept[AnalysisException] { - sql(s"DELETE FROM $tableNameAsString WHERE id <= 1 AND rand() > 0.5") - } - assert(e.message.contains("nondeterministic expressions are only allowed")) - } - test("delete without condition executed as delete with filters") { createAndInitTable("pk INT NOT NULL, id INT, dep INT", """{ "pk": 1, "id": 1, "dep": 100 } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala index fd7a04ea926..4da85a5ce05 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedDeleteFromTableSuite.scala @@ -27,6 +27,26 @@ class DeltaBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { props } + test("delete with nondeterministic conditions") { + createAndInitTable("pk INT NOT NULL, id INT, dep STRING", + """{ "pk": 1, "id": 1, "dep": "hr" } + |{ "pk": 2, "id": 2, "dep": "software" } + |{ "pk": 3, "id": 3, "dep": "hr" } + |""".stripMargin) + + checkError( + exception = intercept[AnalysisException]( + sql(s"DELETE FROM $tableNameAsString WHERE id <= 1 AND rand() > 0.5")), + errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", + parameters = Map( + "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\""), + context = ExpectedContext( + fragment = "DELETE FROM cat.ns1.test_table WHERE id <= 1 AND rand() > 0.5", + start = 0, + stop = 60) + ) + } + test("nullable row ID attrs") { createAndInitTable("pk INT, salary INT, dep STRING", """{ "pk": 1, "salary": 300, "dep": 'hr' } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala index af78b2884f5..73b6ec22a72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DeltaBasedUpdateTableSuite.scala @@ -53,4 +53,25 @@ class DeltaBasedUpdateTableSuite extends UpdateTableSuiteBase { sql(s"SELECT * FROM $tableNameAsString"), Row(10, 1, "hr") :: Row(2, 2, "software") :: Row(3, 3, "hr") :: Nil) } + + test("update with nondeterministic conditions") { + createAndInitTable("pk INT NOT NULL, id INT, dep STRING", + """{ "pk": 1, "id": 1, "dep": "hr" } + |{ "pk": 2, "id": 2, "dep": "software" } + |{ "pk": 3, "id": 3, "dep": "hr" } + |""".stripMargin) + + checkError( + exception = intercept[AnalysisException] { + sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE id <= 1 AND rand() > 0.5") + }, + errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", + parameters = Map( + "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\""), + context = ExpectedContext( + fragment = "UPDATE cat.ns1.test_table SET dep = 'invalid' WHERE id <= 1 AND rand() > 0.5", + start = 0, + stop = 75) + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala index 6c80d46b0ef..0aeab95f58a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedDeleteFromTableSuite.scala @@ -17,13 +17,33 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.internal.SQLConf class GroupBasedDeleteFromTableSuite extends DeleteFromTableSuiteBase { import testImplicits._ + test("delete with nondeterministic conditions") { + createAndInitTable("pk INT NOT NULL, id INT, dep STRING", + """{ "pk": 1, "id": 1, "dep": "hr" } + |{ "pk": 2, "id": 2, "dep": "software" } + |{ "pk": 3, "id": 3, "dep": "hr" } + |""".stripMargin) + + checkError( + exception = intercept[AnalysisException]( + sql(s"DELETE FROM $tableNameAsString WHERE id <= 1 AND rand() > 0.5")), + errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", + parameters = Map( + "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\", \"((id <= 1) AND (rand() > 0.5))\""), + context = ExpectedContext( + fragment = "DELETE FROM cat.ns1.test_table WHERE id <= 1 AND rand() > 0.5", + start = 0, + stop = 60) + ) + } + test("delete with IN predicate and runtime group filtering") { createAndInitTable("id INT, salary INT, dep STRING", """{ "id": 1, "salary": 300, "dep": 'hr' } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala index e2f73e51de0..3e736421a31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/GroupBasedUpdateTableSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.connector -import org.apache.spark.sql.Row +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.expressions.DynamicPruningExpression import org.apache.spark.sql.execution.{InSubqueryExec, ReusedSubqueryExec} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec @@ -110,4 +110,25 @@ class GroupBasedUpdateTableSuite extends UpdateTableSuiteBase { } } } + + test("update with nondeterministic conditions") { + createAndInitTable("pk INT NOT NULL, id INT, dep STRING", + """{ "pk": 1, "id": 1, "dep": "hr" } + |{ "pk": 2, "id": 2, "dep": "software" } + |{ "pk": 3, "id": 3, "dep": "hr" } + |""".stripMargin) + + checkError( + exception = intercept[AnalysisException] { + sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE id <= 1 AND rand() > 0.5") + }, + errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS", + parameters = Map( + "sqlExprs" -> "\"((id <= 1) AND (rand() > 0.5))\", \"((id <= 1) AND (rand() > 0.5))\""), + context = ExpectedContext( + fragment = "UPDATE cat.ns1.test_table SET dep = 'invalid' WHERE id <= 1 AND rand() > 0.5", + start = 0, + stop = 75) + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala index 346390efa94..b43101c2e02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/UpdateTableSuiteBase.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.SparkException -import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue} import org.apache.spark.sql.connector.expressions.LiteralValue import org.apache.spark.sql.types.{IntegerType, StringType} @@ -528,19 +528,6 @@ abstract class UpdateTableSuiteBase extends RowLevelOperationSuiteBase { Row(2) :: Nil) } - test("update with nondeterministic conditions") { - createAndInitTable("pk INT NOT NULL, id INT, dep STRING", - """{ "pk": 1, "id": 1, "dep": "hr" } - |{ "pk": 2, "id": 2, "dep": "software" } - |{ "pk": 3, "id": 3, "dep": "hr" } - |""".stripMargin) - - val e = intercept[AnalysisException] { - sql(s"UPDATE $tableNameAsString SET dep = 'invalid' WHERE id <= 1 AND rand() > 0.5") - } - assert(e.message.contains("nondeterministic expressions are only allowed")) - } - test("update with default values") { val idDefault = new ColumnDefaultValue("42", LiteralValue(42, IntegerType)) val columns = Array( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org