This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 7dc4e32 [SPARK-33733][SQL] PullOutNondeterministic should check and collect deterministic field 7dc4e32 is described below commit 7dc4e32c114205ebb035512f6b7fd3f26154d1f0 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Mon Dec 14 14:35:24 2020 +0000 [SPARK-33733][SQL] PullOutNondeterministic should check and collect deterministic field ### What changes were proposed in this pull request? The deterministic field is wider than `NonDerterministic`, we should keep same range between pull out and check analysis. ### Why are the changes needed? For example ``` select * from values(1), (4) as t(c1) order by java_method('java.lang.Math', 'abs', c1) ``` We will get exception since `java_method` deterministic field is false but not a `NonDeterministic` ``` Exception in thread "main" org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found: java_method('java.lang.Math', 'abs', t.`c1`) ASC NULLS FIRST in operator Sort [java_method(java.lang.Math, abs, c1#1) ASC NULLS FIRST], true ;; ``` ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Add test. Closes #30703 from ulysses-you/SPARK-33733. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 839d6899adafd9a0695667656d00220d4665895d) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++++- .../expressions/CallMethodViaReflection.scala | 6 +++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 22 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a688a24..c5c0c68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2947,7 +2947,10 @@ class Analyzer(override val catalogManager: CatalogManager) private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { exprs.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { case n: Nondeterministic => n } + val leafNondeterministic = expr.collect { + case n: Nondeterministic => n + case udf: UserDefinedExpression if !udf.deterministic => udf + } leafNondeterministic.distinct.map { e => val ne = e match { case n: NamedExpression => n diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala index 4bd6418..0979a18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.Utils """, since = "2.0.0") case class CallMethodViaReflection(children: Seq[Expression]) - extends Expression with CodegenFallback { + extends Nondeterministic with CodegenFallback { override def prettyName: String = getTagValue(FunctionRegistry.FUNC_ALIAS).getOrElse("reflect") @@ -77,11 +77,11 @@ case class CallMethodViaReflection(children: Seq[Expression]) } } - override lazy val deterministic: Boolean = false override def nullable: Boolean = true override val dataType: DataType = StringType + override protected def initializeInternal(partitionIndex: Int): Unit = {} - override def eval(input: InternalRow): Any = { + override protected def evalInternal(input: InternalRow): Any = { var i = 0 while (i < argExprs.length) { buffer(i) = argExprs(i).eval(input).asInstanceOf[Object] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index f5bfdc5..468b8c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -984,4 +984,26 @@ class AnalysisSuite extends AnalysisTest with Matchers { s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value.")) } } + + test("SPARK-33733: PullOutNondeterministic should check and collect deterministic field") { + val reflect = + CallMethodViaReflection(Seq("java.lang.Math", "abs", testRelation.output.head)) + val udf = ScalaUDF( + (s: String) => s, + StringType, + Literal.create(null, StringType) :: Nil, + Option(ExpressionEncoder[String]().resolveAndBind()) :: Nil, + udfDeterministic = false) + + Seq(reflect, udf).foreach { e: Expression => + val plan = Sort(Seq(e.asc), false, testRelation) + val projected = Alias(e, "_nondeterministic")() + val expect = + Project(testRelation.output, + Sort(Seq(projected.toAttribute.asc), false, + Project(testRelation.output :+ projected, + testRelation))) + checkAnalysis(plan, expect) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org