This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new b15a8725b25c [SPARK-48871] Fix INVALID_NON_DETERMINISTIC_EXPRESSIONS 
validation in…
b15a8725b25c is described below

commit b15a8725b25c0b3f78efcccfb1f69e8d7fbd9a72
Author: zhipeng.mao <zhipeng....@databricks.com>
AuthorDate: Fri Jul 12 14:51:47 2024 +0800

    [SPARK-48871] Fix INVALID_NON_DETERMINISTIC_EXPRESSIONS validation in…
    
    … CheckAnalysis
    
    The PR added a trait that logical plans can extend to implement a method to 
decide whether there can be non-deterministic expressions for the operator, and 
check this method in checkAnalysis.
    
    I encountered the `INVALID_NON_DETERMINISTIC_EXPRESSIONS` exception when 
attempting to use a non-deterministic udf in my query. The non-deterministic 
expression can be safely allowed for my custom LogicalPlan, but it is disabled 
in the checkAnalysis phase. The CheckAnalysis rule is too strict so that 
reasonable use cases of non-deterministic expressions are also disabled.
    
    No
    
    The test case `"SPARK-48871: AllowsNonDeterministicExpression allow lists 
non-deterministic expressions"` is added.
    
    No
    
    Closes #47304 from zhipengmao-db/zhipengmao-db/SPARK-48871-check-analysis.
    
    Lead-authored-by: zhipeng.mao <zhipeng....@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 9cbd5dd4e7477294f7d4289880c7ea0dd67b38d3)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 12 +++++++++++
 .../plans/logical/basicLogicalOperators.scala      | 10 ++++++++++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 23 ++++++++++++++++++++++
 3 files changed, 45 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 485015f2efab..bb399e41d7d0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -157,6 +157,17 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     visited(cteId) = true
   }
 
+  /**
+   * Checks whether the operator allows non-deterministic expressions.
+   */
+  private def operatorAllowsNonDeterministicExpressions(plan: LogicalPlan): 
Boolean = {
+    plan match {
+      case p: SupportsNonDeterministicExpression =>
+        p.allowNonDeterministicExpression
+      case _ => false
+    }
+  }
+
   def checkAnalysis(plan: LogicalPlan): Unit = {
     val inlineCTE = InlineCTE(alwaysInline = true)
     val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, 
mutable.Map[Long, Int])]
@@ -771,6 +782,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
                 "dataType" -> toSQLType(mapCol.dataType)))
 
           case o if o.expressions.exists(!_.deterministic) &&
+            !operatorAllowsNonDeterministicExpressions(o) &&
             !o.isInstanceOf[Project] && !o.isInstanceOf[Filter] &&
             !o.isInstanceOf[Aggregate] && !o.isInstanceOf[Window] &&
             !o.isInstanceOf[Expand] &&
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ca2c6a850561..f76e698a6400 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1957,6 +1957,16 @@ case class DeduplicateWithinWatermark(keys: 
Seq[Attribute], child: LogicalPlan)
  */
 trait SupportsSubquery extends LogicalPlan
 
+/**
+ * Trait that logical plans can extend to check whether it can allow 
non-deterministic
+ * expressions and pass the CheckAnalysis rule.
+ */
+trait SupportsNonDeterministicExpression extends LogicalPlan {
+
+  /** Returns whether it allows non-deterministic expressions. */
+  def allowNonDeterministicExpression: Boolean
+}
+
 /**
  * Collect arbitrary (named) metrics from a dataset. As soon as the query 
reaches a completion
  * point (batch query completes or streaming query epoch completes) an event 
is emitted on the
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index a7df53db936f..6b5f0fe3876d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -118,6 +118,13 @@ case class TestFunctionWithTypeCheckFailure(
 
 case class UnresolvedTestPlan() extends UnresolvedLeafNode
 
+case class SupportsNonDeterministicExpressionTestOperator(
+    actions: Seq[Expression],
+    allowNonDeterministicExpression: Boolean)
+  extends LeafNode with SupportsNonDeterministicExpression {
+  override def output: Seq[Attribute] = Seq()
+}
+
 class AnalysisErrorSuite extends AnalysisTest {
   import TestRelations._
 
@@ -1327,4 +1334,20 @@ class AnalysisErrorSuite extends AnalysisTest {
       )
     }
   }
+
+  test("SPARK-48871: SupportsNonDeterministicExpression allows 
non-deterministic expressions") {
+    val nonDeterministicExpressions = Seq(new Rand())
+    val tolerantPlan =
+      SupportsNonDeterministicExpressionTestOperator(
+        nonDeterministicExpressions, allowNonDeterministicExpression = true)
+    assertAnalysisSuccess(tolerantPlan)
+
+    val intolerantPlan =
+      SupportsNonDeterministicExpressionTestOperator(
+        nonDeterministicExpressions, allowNonDeterministicExpression = false)
+    assertAnalysisError(
+      intolerantPlan,
+      "INVALID_NON_DETERMINISTIC_EXPRESSIONS" :: Nil
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to