Repository: spark Updated Branches: refs/heads/branch-2.4 e80ab130e -> 1961f8e62
[SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and can be applied infinitely ## What changes were proposed in this pull request? The HandleNullInputsForUDF rule can generate new If node infinitely, thus causing problems like match of SQL cache missed. This was fixed in SPARK-24891 and was then broken by SPARK-25044. The unit test in `AnalysisSuite` added in SPARK-24891 should have failed but didn't because it wasn't properly updated after the `ScalaUDF` constructor signature change. So this PR also updates the test accordingly based on the new `ScalaUDF` constructor. ## How was this patch tested? Updated the original UT. This should be justified as the original UT became invalid after SPARK-25044. Closes #22701 from maryannxue/spark-25690. Authored-by: maryannxue <maryann...@apache.org> Signed-off-by: gatorsmile <gatorsm...@gmail.com> (cherry picked from commit 368513048198efcee8c9a35678b608be0cb9ad48) Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1961f8e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1961f8e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1961f8e6 Branch: refs/heads/branch-2.4 Commit: 1961f8e62f2c6d546d42d37423bfad2f55e75e6a Parents: e80ab13 Author: maryannxue <maryann...@apache.org> Authored: Thu Oct 11 20:45:08 2018 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Thu Oct 11 20:45:24 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 +++- .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1961f8e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fdb68dd..9c0975e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2164,8 +2164,10 @@ class Analyzer( // TODO: skip null handling for not-nullable primitive inputs after we can completely // trust the `nullable` information. + val needsNullCheck = (nullable: Boolean, expr: Expression) => + nullable && !expr.isInstanceOf[KnownNotNull] val inputsNullCheck = nullableTypes.zip(inputs) - .filter { case (nullable, _) => !nullable } + .filter { case (nullableType, expr) => needsNullCheck(!nullableType, expr) } .map { case (_, expr) => IsNull(expr) } .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2)) // Once we add an `If` check above the udf, it is safe to mark those checked inputs http://git-wip-us.apache.org/repos/asf/spark/blob/1961f8e6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index f9facbb..cf76c92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -351,8 +351,8 @@ class AnalysisSuite extends AnalysisTest with Matchers { test("SPARK-24891 Fix HandleNullInputsForUDF rule") { val a = testRelation.output(0) val func = (x: Int, y: Int) => x + y - val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil) - val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil) + val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = false :: false :: Nil) + val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = false :: false :: Nil) val plan = Project(Alias(udf2, "")() :: Nil, testRelation) comparePlans(plan.analyze, plan.analyze.analyze) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org