Repository: spark
Updated Branches:
  refs/heads/master 697a5e551 -> b7aac15d5


[SPARK-20700][SQL] InferFiltersFromConstraints stackoverflows for query (v2)

## What changes were proposed in this pull request?

In the previous approach we used `aliasMap` to link an `Attribute` to the 
expression with potentially the form `f(a, b)`, but we only searched the 
`expressions` and `children.expressions` for this, which is not enough when an 
`Alias` may lies deep in the logical plan. In that case, we can't generate the 
valid equivalent constraint classes and thus we fail at preventing the 
recursive deductions.

We fix this problem by collecting all `Alias`s from the logical plan.

## How was this patch tested?

No additional test case is added, but do modified one test case to cover this 
situation.

Author: Xingbo Jiang <xingbo.ji...@databricks.com>

Closes #18020 from jiangxb1987/inferConstrants.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7aac15d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7aac15d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7aac15d

Branch: refs/heads/master
Commit: b7aac15d566b048c20c2491fbf376b727f2eeb68
Parents: 697a5e5
Author: Xingbo Jiang <xingbo.ji...@databricks.com>
Authored: Wed May 17 23:32:31 2017 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Wed May 17 23:32:31 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  9 +++---
 .../InferFiltersFromConstraintsSuite.scala      | 32 +++++++++++---------
 2 files changed, 22 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b7aac15d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 959fcf7..d3f822b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -81,11 +81,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
     case _ => Seq.empty[Attribute]
   }
 
-  // Collect aliases from expressions, so we may avoid producing recursive 
constraints.
-  private lazy val aliasMap = AttributeMap(
-    (expressions ++ children.flatMap(_.expressions)).collect {
+  // Collect aliases from expressions of the whole tree rooted by the current 
QueryPlan node, so
+  // we may avoid producing recursive constraints.
+  private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
+    expressions.collect {
       case a: Alias => (a.toAttribute, a.child)
-    })
+    } ++ children.flatMap(_.aliasMap))
 
   /**
    * Infers an additional set of constraints from a given set of equality 
constraints.

http://git-wip-us.apache.org/repos/asf/spark/blob/b7aac15d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index c8fe374..9a4bcdb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -33,7 +33,8 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
         PushPredicateThroughJoin,
         PushDownPredicate,
         InferFiltersFromConstraints(conf),
-        CombineFilters) :: Nil
+        CombineFilters,
+        BooleanSimplification) :: Nil
   }
 
   object OptimizeWithConstraintPropagationDisabled extends 
RuleExecutor[LogicalPlan] {
@@ -172,7 +173,12 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
     val t1 = testRelation.subquery('t1)
     val t2 = testRelation.subquery('t2)
 
-    val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 
'b)).as('int_col)).as("t")
+    // We should prevent `Coalese(a, b)` from recursively creating complicated 
constraints through
+    // the constraint inference procedure.
+    val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 
'b)).as('int_col))
+      // We hide an `Alias` inside the child's child's expressions, to cover 
the situation reported
+      // in [SPARK-20700].
+      .select('int_col, 'd, 'a).as("t")
       .join(t2, Inner,
         Some("t.a".attr === "t2.a".attr
           && "t.d".attr === "t2.a".attr
@@ -180,22 +186,18 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
       .analyze
     val correctAnswer = t1
       .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))
-        && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) && 'a 
<=> 'a
-        && Coalesce(Seq('a, 'a)) <=> 'b && Coalesce(Seq('a, 'a)) <=> 
Coalesce(Seq('a, 'a))
-        && 'a === 'b && IsNotNull(Coalesce(Seq('a, 'b))) && 'a === 
Coalesce(Seq('a, 'b))
-        && Coalesce(Seq('a, 'b)) <=> Coalesce(Seq('b, 'b)) && Coalesce(Seq('a, 
'b)) === 'b
+        && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a))
+        && Coalesce(Seq('b, 'b)) <=> 'a && 'a === 'b && 
IsNotNull(Coalesce(Seq('a, 'b)))
+        && 'a === Coalesce(Seq('a, 'b)) && Coalesce(Seq('a, 'b)) === 'b
         && IsNotNull('b) && IsNotNull(Coalesce(Seq('b, 'b)))
-        && 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b))
-        && Coalesce(Seq('b, 'b)) <=> Coalesce(Seq('b, 'b)) && 'b <=> 'b)
-      .select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)).as("t")
+        && 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b)))
+      .select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col))
+      .select('int_col, 'd, 'a).as("t")
       .join(t2
         .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))
-          && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) && 
'a <=> 'a
-          && Coalesce(Seq('a, 'a)) <=> Coalesce(Seq('a, 'a))), Inner,
-        Some("t.a".attr === "t2.a".attr
-          && "t.d".attr === "t2.a".attr
-          && "t.int_col".attr === "t2.a".attr
-          && Coalesce(Seq("t.d".attr, "t.d".attr)) <=> "t.int_col".attr))
+          && 'a <=> Coalesce(Seq('a, 'a)) && 'a === Coalesce(Seq('a, 'a)) && 
'a <=> 'a), Inner,
+        Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr
+          && "t.int_col".attr === "t2.a".attr))
       .analyze
     val optimized = Optimize.execute(originalQuery)
     comparePlans(optimized, correctAnswer)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to