This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 399b2ca [SPARK-33733][SQL][2.4] PullOutNondeterministic should check and collect deterministic field 399b2ca is described below commit 399b2cafe3fcec17d40c4ccfd863848aa43d7da4 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Thu Dec 17 08:43:43 2020 +0000 [SPARK-33733][SQL][2.4] PullOutNondeterministic should check and collect deterministic field backport [#30703](https://github.com/apache/spark/pull/30703) for branch-2.4. ### What changes were proposed in this pull request? The deterministic field is wider than `NonDerterministic`, we should keep same range between pull out and check analysis. ### Why are the changes needed? For example ``` select * from values(1), (4) as t(c1) order by java_method('java.lang.Math', 'abs', c1) ``` We will get exception since `java_method` deterministic field is false but not a `NonDeterministic` ``` Exception in thread "main" org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in Project, Filter, Aggregate or Window, found: java_method('java.lang.Math', 'abs', t.`c1`) ASC NULLS FIRST in operator Sort [java_method(java.lang.Math, abs, c1#1) ASC NULLS FIRST], true ;; ``` ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Add test. Closes #30772 from ulysses-you/SPARK-33733-branch-2.4. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++++- .../expressions/CallMethodViaReflection.scala | 6 +++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 22 ++++++++++++++++++++++ 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dc40a6a..5ad2e7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2260,7 +2260,10 @@ class Analyzer( private def getNondeterToAttr(exprs: Seq[Expression]): Map[Expression, NamedExpression] = { exprs.filterNot(_.deterministic).flatMap { expr => - val leafNondeterministic = expr.collect { case n: Nondeterministic => n } + val leafNondeterministic = expr.collect { + case n: Nondeterministic => n + case udf: UserDefinedExpression if !udf.deterministic => udf + } leafNondeterministic.distinct.map { e => val ne = e match { case n: NamedExpression => n diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala index 65bb9a8..0966fcb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala @@ -53,7 +53,7 @@ import org.apache.spark.util.Utils a5cf6c42-0c85-418f-af6c-3e4e5b1328f2 """) case class CallMethodViaReflection(children: Seq[Expression]) - extends Expression with CodegenFallback { + extends Nondeterministic with CodegenFallback { override def prettyName: String = "reflect" @@ -76,11 +76,11 @@ case class CallMethodViaReflection(children: Seq[Expression]) } } - override lazy val deterministic: Boolean = false override def nullable: Boolean = true override val dataType: DataType = StringType + override protected def initializeInternal(partitionIndex: Int): Unit = {} - override def eval(input: InternalRow): Any = { + override protected def evalInternal(input: InternalRow): Any = { var i = 0 while (i < argExprs.length) { buffer(i) = argExprs(i).eval(input).asInstanceOf[Object] diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 8eaba32..4c4ba11 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -657,4 +657,26 @@ class AnalysisSuite extends AnalysisTest with Matchers { Seq("Intersect can only be performed on tables with the compatible column types. " + "timestamp <> double at the second column of the second table")) } + + test("SPARK-33733: PullOutNondeterministic should check and collect deterministic field") { + val reflect = + CallMethodViaReflection(Seq("java.lang.Math", "abs", testRelation.output.head)) + val udf = ScalaUDF( + (s: String) => s, + StringType, + Literal.create(null, StringType) :: Nil, + true :: Nil, + udfDeterministic = false) + + Seq(reflect, udf).foreach { e: Expression => + val plan = Sort(Seq(e.asc), false, testRelation) + val projected = Alias(e, "_nondeterministic")() + val expect = + Project(testRelation.output, + Sort(Seq(projected.toAttribute.asc), false, + Project(testRelation.output :+ projected, + testRelation))) + checkAnalysis(plan, expect) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org