spark git commit: [SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and can be applied infinitely

2018-10-11 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.4 e80ab130e -> 1961f8e62


[SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and 
can be applied infinitely

## What changes were proposed in this pull request?

The HandleNullInputsForUDF rule can generate new If node infinitely, thus 
causing problems like match of SQL cache missed.
This was fixed in SPARK-24891 and was then broken by SPARK-25044.
The unit test in `AnalysisSuite` added in SPARK-24891 should have failed but 
didn't because it wasn't properly updated after the `ScalaUDF` constructor 
signature change. So this PR also updates the test accordingly based on the new 
`ScalaUDF` constructor.

## How was this patch tested?

Updated the original UT. This should be justified as the original UT became 
invalid after SPARK-25044.

Closes #22701 from maryannxue/spark-25690.

Authored-by: maryannxue 
Signed-off-by: gatorsmile 
(cherry picked from commit 368513048198efcee8c9a35678b608be0cb9ad48)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1961f8e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1961f8e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1961f8e6

Branch: refs/heads/branch-2.4
Commit: 1961f8e62f2c6d546d42d37423bfad2f55e75e6a
Parents: e80ab13
Author: maryannxue 
Authored: Thu Oct 11 20:45:08 2018 -0700
Committer: gatorsmile 
Committed: Thu Oct 11 20:45:24 2018 -0700

--
 .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 4 +++-
 .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala   | 4 ++--
 2 files changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1961f8e6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fdb68dd..9c0975e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2164,8 +2164,10 @@ class Analyzer(
 
 // TODO: skip null handling for not-nullable primitive inputs 
after we can completely
 // trust the `nullable` information.
+val needsNullCheck = (nullable: Boolean, expr: Expression) =>
+  nullable && !expr.isInstanceOf[KnownNotNull]
 val inputsNullCheck = nullableTypes.zip(inputs)
-  .filter { case (nullable, _) => !nullable }
+  .filter { case (nullableType, expr) => 
needsNullCheck(!nullableType, expr) }
   .map { case (_, expr) => IsNull(expr) }
   .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2))
 // Once we add an `If` check above the udf, it is safe to mark 
those checked inputs

http://git-wip-us.apache.org/repos/asf/spark/blob/1961f8e6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index f9facbb..cf76c92 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -351,8 +351,8 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
 val a = testRelation.output(0)
 val func = (x: Int, y: Int) => x + y
-val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil)
-val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil)
+val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = 
false :: false :: Nil)
+val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = 
false :: false :: Nil)
 val plan = Project(Alias(udf2, "")() :: Nil, testRelation)
 comparePlans(plan.analyze, plan.analyze.analyze)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and can be applied infinitely

2018-10-11 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master c9d7d83ed -> 368513048


[SPARK-25690][SQL] Analyzer rule HandleNullInputsForUDF does not stabilize and 
can be applied infinitely

## What changes were proposed in this pull request?

The HandleNullInputsForUDF rule can generate new If node infinitely, thus 
causing problems like match of SQL cache missed.
This was fixed in SPARK-24891 and was then broken by SPARK-25044.
The unit test in `AnalysisSuite` added in SPARK-24891 should have failed but 
didn't because it wasn't properly updated after the `ScalaUDF` constructor 
signature change. So this PR also updates the test accordingly based on the new 
`ScalaUDF` constructor.

## How was this patch tested?

Updated the original UT. This should be justified as the original UT became 
invalid after SPARK-25044.

Closes #22701 from maryannxue/spark-25690.

Authored-by: maryannxue 
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/36851304
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/36851304
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/36851304

Branch: refs/heads/master
Commit: 368513048198efcee8c9a35678b608be0cb9ad48
Parents: c9d7d83
Author: maryannxue 
Authored: Thu Oct 11 20:45:08 2018 -0700
Committer: gatorsmile 
Committed: Thu Oct 11 20:45:08 2018 -0700

--
 .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala  | 4 +++-
 .../org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala   | 4 ++--
 2 files changed, 5 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/36851304/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d72e512..7f641ac 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2150,8 +2150,10 @@ class Analyzer(
 
 // TODO: skip null handling for not-nullable primitive inputs 
after we can completely
 // trust the `nullable` information.
+val needsNullCheck = (nullable: Boolean, expr: Expression) =>
+  nullable && !expr.isInstanceOf[KnownNotNull]
 val inputsNullCheck = nullableTypes.zip(inputs)
-  .filter { case (nullable, _) => !nullable }
+  .filter { case (nullableType, expr) => 
needsNullCheck(!nullableType, expr) }
   .map { case (_, expr) => IsNull(expr) }
   .reduceLeftOption[Expression]((e1, e2) => Or(e1, e2))
 // Once we add an `If` check above the udf, it is safe to mark 
those checked inputs

http://git-wip-us.apache.org/repos/asf/spark/blob/36851304/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index f9facbb..cf76c92 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -351,8 +351,8 @@ class AnalysisSuite extends AnalysisTest with Matchers {
   test("SPARK-24891 Fix HandleNullInputsForUDF rule") {
 val a = testRelation.output(0)
 val func = (x: Int, y: Int) => x + y
-val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil)
-val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil)
+val udf1 = ScalaUDF(func, IntegerType, a :: a :: Nil, nullableTypes = 
false :: false :: Nil)
+val udf2 = ScalaUDF(func, IntegerType, a :: udf1 :: Nil, nullableTypes = 
false :: false :: Nil)
 val plan = Project(Alias(udf2, "")() :: Nil, testRelation)
 comparePlans(plan.analyze, plan.analyze.analyze)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org