This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e2930b8dc08 [SPARK-38868][SQL] Don't propagate exceptions from filter 
predicate when optimizing outer joins
e2930b8dc08 is described below

commit e2930b8dc087e5a284b451c4cac6c1a2459b456d
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Mon Apr 25 13:49:15 2022 +0800

    [SPARK-38868][SQL] Don't propagate exceptions from filter predicate when 
optimizing outer joins
    
    ### What changes were proposed in this pull request?
    
    Change `EliminateOuterJoin#canFilterOutNull` to return `false` when a 
`where` condition throws an exception.
    
    ### Why are the changes needed?
    
    Consider this query:
    ```
    select *
    from (select id, id as b from range(0, 10)) l
    left outer join (select id, id + 1 as c from range(0, 10)) r
    on l.id = r.id
    where assert_true(c > 0) is null;
    ```
    The query should succeed, but instead fails with
    ```
    java.lang.RuntimeException: '(c#1L > cast(0 as bigint))' is not true!
    ```
    This happens even though there is no row where `c > 0` is false.
    
    The `EliminateOuterJoin` rule checks if it can convert the outer join to a 
inner join based on the expression in the where clause, which in this case is
    ```
    assert_true(c > 0) is null
    ```
    `EliminateOuterJoin#canFilterOutNull` evaluates that expression with `c` 
set to `null` to see if the result is `null` or `false`. That rule doesn't 
expect the result to be a `RuntimeException`, but in this case it always is.
    
    That is, the assertion is failing during optimization, not at run time.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #36230 from bersprockets/outer_join_eval_assert_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/optimizer/joins.scala    | 14 ++++++++++++--
 .../catalyst/optimizer/OuterJoinEliminationSuite.scala | 18 +++++++++++++++++-
 2 files changed, 29 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index b21594deb70..e5e91acf865 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.optimizer
 
 import scala.annotation.tailrec
+import scala.util.control.NonFatal
 
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction
@@ -151,8 +152,17 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
     val emptyRow = new GenericInternalRow(attributes.length)
     val boundE = BindReferences.bindReference(e, attributes)
     if (boundE.exists(_.isInstanceOf[Unevaluable])) return false
-    val v = boundE.eval(emptyRow)
-    v == null || v == false
+
+    // some expressions, like map(), may throw an exception when dealing with 
null values.
+    // therefore, we need to handle exceptions.
+    try {
+      val v = boundE.eval(emptyRow)
+      v == null || v == false
+    } catch {
+      case NonFatal(e) =>
+        // cannot filter out null if `where` expression throws an exception 
with null input
+        false
+    }
   }
 
   private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
index 192db596347..2530cfded9e 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala
@@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull}
+import org.apache.spark.sql.catalyst.expressions.{Coalesce, If, IsNotNull, 
Literal, RaiseError}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
 
 class OuterJoinEliminationSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
@@ -252,4 +254,18 @@ class OuterJoinEliminationSuite extends PlanTest {
       comparePlans(optimized, originalQuery.analyze)
     }
   }
+
+  test("SPARK-38868: exception thrown from filter predicate does not 
propagate") {
+    val x = testRelation.subquery(Symbol("x"))
+    val y = testRelation1.subquery(Symbol("y"))
+
+    val message = Literal(UTF8String.fromString("Bad value"), StringType)
+    val originalQuery =
+      x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr))
+        .where(If("y.d".attr > 0, true, RaiseError(message)).isNull)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+
+    comparePlans(optimized, originalQuery.analyze)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to