This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new b15a8725b25c [SPARK-48871] Fix INVALID_NON_DETERMINISTIC_EXPRESSIONS validation in… b15a8725b25c is described below commit b15a8725b25c0b3f78efcccfb1f69e8d7fbd9a72 Author: zhipeng.mao <zhipeng....@databricks.com> AuthorDate: Fri Jul 12 14:51:47 2024 +0800 [SPARK-48871] Fix INVALID_NON_DETERMINISTIC_EXPRESSIONS validation in… … CheckAnalysis The PR added a trait that logical plans can extend to implement a method to decide whether there can be non-deterministic expressions for the operator, and check this method in checkAnalysis. I encountered the `INVALID_NON_DETERMINISTIC_EXPRESSIONS` exception when attempting to use a non-deterministic udf in my query. The non-deterministic expression can be safely allowed for my custom LogicalPlan, but it is disabled in the checkAnalysis phase. The CheckAnalysis rule is too strict so that reasonable use cases of non-deterministic expressions are also disabled. No The test case `"SPARK-48871: AllowsNonDeterministicExpression allow lists non-deterministic expressions"` is added. No Closes #47304 from zhipengmao-db/zhipengmao-db/SPARK-48871-check-analysis. Lead-authored-by: zhipeng.mao <zhipeng....@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 9cbd5dd4e7477294f7d4289880c7ea0dd67b38d3) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/CheckAnalysis.scala | 12 +++++++++++ .../plans/logical/basicLogicalOperators.scala | 10 ++++++++++ .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 23 ++++++++++++++++++++++ 3 files changed, 45 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 485015f2efab..bb399e41d7d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -157,6 +157,17 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB visited(cteId) = true } + /** + * Checks whether the operator allows non-deterministic expressions. + */ + private def operatorAllowsNonDeterministicExpressions(plan: LogicalPlan): Boolean = { + plan match { + case p: SupportsNonDeterministicExpression => + p.allowNonDeterministicExpression + case _ => false + } + } + def checkAnalysis(plan: LogicalPlan): Unit = { val inlineCTE = InlineCTE(alwaysInline = true) val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])] @@ -771,6 +782,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB "dataType" -> toSQLType(mapCol.dataType))) case o if o.expressions.exists(!_.deterministic) && + !operatorAllowsNonDeterministicExpressions(o) && !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] && !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] && !o.isInstanceOf[Expand] && diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index ca2c6a850561..f76e698a6400 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1957,6 +1957,16 @@ case class DeduplicateWithinWatermark(keys: Seq[Attribute], child: LogicalPlan) */ trait SupportsSubquery extends LogicalPlan +/** + * Trait that logical plans can extend to check whether it can allow non-deterministic + * expressions and pass the CheckAnalysis rule. + */ +trait SupportsNonDeterministicExpression extends LogicalPlan { + + /** Returns whether it allows non-deterministic expressions. */ + def allowNonDeterministicExpression: Boolean +} + /** * Collect arbitrary (named) metrics from a dataset. As soon as the query reaches a completion * point (batch query completes or streaming query epoch completes) an event is emitted on the diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index a7df53db936f..6b5f0fe3876d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -118,6 +118,13 @@ case class TestFunctionWithTypeCheckFailure( case class UnresolvedTestPlan() extends UnresolvedLeafNode +case class SupportsNonDeterministicExpressionTestOperator( + actions: Seq[Expression], + allowNonDeterministicExpression: Boolean) + extends LeafNode with SupportsNonDeterministicExpression { + override def output: Seq[Attribute] = Seq() +} + class AnalysisErrorSuite extends AnalysisTest { import TestRelations._ @@ -1327,4 +1334,20 @@ class AnalysisErrorSuite extends AnalysisTest { ) } } + + test("SPARK-48871: SupportsNonDeterministicExpression allows non-deterministic expressions") { + val nonDeterministicExpressions = Seq(new Rand()) + val tolerantPlan = + SupportsNonDeterministicExpressionTestOperator( + nonDeterministicExpressions, allowNonDeterministicExpression = true) + assertAnalysisSuccess(tolerantPlan) + + val intolerantPlan = + SupportsNonDeterministicExpressionTestOperator( + nonDeterministicExpressions, allowNonDeterministicExpression = false) + assertAnalysisError( + intolerantPlan, + "INVALID_NON_DETERMINISTIC_EXPRESSIONS" :: Nil + ) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org