This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e2930b8dc08 [SPARK-38868][SQL] Don't propagate exceptions from filter predicate when optimizing outer joins e2930b8dc08 is described below commit e2930b8dc087e5a284b451c4cac6c1a2459b456d Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Mon Apr 25 13:49:15 2022 +0800 [SPARK-38868][SQL] Don't propagate exceptions from filter predicate when optimizing outer joins ### What changes were proposed in this pull request? Change `EliminateOuterJoin#canFilterOutNull` to return `false` when a `where` condition throws an exception. ### Why are the changes needed? Consider this query: ``` select * from (select id, id as b from range(0, 10)) l left outer join (select id, id + 1 as c from range(0, 10)) r on l.id = r.id where assert_true(c > 0) is null; ``` The query should succeed, but instead fails with ``` java.lang.RuntimeException: '(c#1L > cast(0 as bigint))' is not true! ``` This happens even though there is no row where `c > 0` is false. The `EliminateOuterJoin` rule checks if it can convert the outer join to a inner join based on the expression in the where clause, which in this case is ``` assert_true(c > 0) is null ``` `EliminateOuterJoin#canFilterOutNull` evaluates that expression with `c` set to `null` to see if the result is `null` or `false`. That rule doesn't expect the result to be a `RuntimeException`, but in this case it always is. That is, the assertion is failing during optimization, not at run time. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes #36230 from bersprockets/outer_join_eval_assert_issue. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/catalyst/optimizer/joins.scala | 14 ++++++++++++-- .../catalyst/optimizer/OuterJoinEliminationSuite.scala | 18 +++++++++++++++++- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index b21594deb70..e5e91acf865 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec +import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction @@ -151,8 +152,17 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { val emptyRow = new GenericInternalRow(attributes.length) val boundE = BindReferences.bindReference(e, attributes) if (boundE.exists(_.isInstanceOf[Unevaluable])) return false - val v = boundE.eval(emptyRow) - v == null || v == false + + // some expressions, like map(), may throw an exception when dealing with null values. + // therefore, we need to handle exceptions. + try { + val v = boundE.eval(emptyRow) + v == null || v == false + } catch { + case NonFatal(e) => + // cannot filter out null if `where` expression throws an exception with null input + false + } } private def buildNewJoinType(filter: Filter, join: Join): JoinType = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index 192db596347..2530cfded9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull} +import org.apache.spark.sql.catalyst.expressions.{Coalesce, If, IsNotNull, Literal, RaiseError} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StringType +import org.apache.spark.unsafe.types.UTF8String class OuterJoinEliminationSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { @@ -252,4 +254,18 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, originalQuery.analyze) } } + + test("SPARK-38868: exception thrown from filter predicate does not propagate") { + val x = testRelation.subquery(Symbol("x")) + val y = testRelation1.subquery(Symbol("y")) + + val message = Literal(UTF8String.fromString("Bad value"), StringType) + val originalQuery = + x.join(y, LeftOuter, Option("x.a".attr === "y.d".attr)) + .where(If("y.d".attr > 0, true, RaiseError(message)).isNull) + + val optimized = Optimize.execute(originalQuery.analyze) + + comparePlans(optimized, originalQuery.analyze) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org