Repository: spark
Updated Branches:
  refs/heads/master c5f9b89dd -> 1a9857476


[SPARK-21979][SQL] Improve QueryPlanConstraints framework

## What changes were proposed in this pull request?

Improve QueryPlanConstraints framework, make it robust and simple.
In https://github.com/apache/spark/pull/15319, constraints for expressions like 
`a = f(b, c)` is resolved.
However, for expressions like
```scala
a = f(b, c) && c = g(a, b)
```
The current QueryPlanConstraints framework will produce non-converging 
constraints.
Essentially, the problem is caused by having both the name and child of aliases 
in the same constraint set.   We infer constraints, and push down constraints 
as predicates in filters, later on these predicates are propagated as 
constraints, etc..
Simply using the alias names only can resolve these problems.  The size of 
constraints is reduced without losing any information. We can always get these 
inferred constraints on child of aliases when pushing down filters.

Also, the EqualNullSafe between name and child in propagating alias is 
meaningless
```scala
allConstraints += EqualNullSafe(e, a.toAttribute)
```
It just produces redundant constraints.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltn...@gmail.com>

Closes #19201 from gengliangwang/QueryPlanConstraints.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a985747
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a985747
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a985747

Branch: refs/heads/master
Commit: 1a9857476698a094dbcbdf261c90e5a684231669
Parents: c5f9b89
Author: Wang Gengliang <ltn...@gmail.com>
Authored: Tue Sep 12 13:02:29 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Sep 12 13:02:29 2017 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/LogicalPlan.scala    |   1 -
 .../plans/logical/QueryPlanConstraints.scala    | 103 ++++++-------------
 .../InferFiltersFromConstraintsSuite.scala      |  46 +++++++--
 .../plans/ConstraintPropagationSuite.scala      |   2 -
 4 files changed, 65 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1a985747/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index d893b39..68aae72 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -297,7 +297,6 @@ abstract class UnaryNode extends LogicalPlan {
           case expr: Expression if expr.semanticEquals(e) =>
             a.toAttribute
         })
-        allConstraints += EqualNullSafe(e, a.toAttribute)
       case _ => // Don't change.
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a985747/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
index 8bffbd0..b0f611f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -106,91 +106,48 @@ trait QueryPlanConstraints { self: LogicalPlan =>
    * Infers an additional set of constraints from a given set of equality 
constraints.
    * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), 
this returns an
    * additional constraint of the form `b = 5`.
-   *
-   * [SPARK-17733] We explicitly prevent producing recursive constraints of 
the form `a = f(a, b)`
-   * as they are often useless and can lead to a non-converging set of 
constraints.
    */
   private def inferAdditionalConstraints(constraints: Set[Expression]): 
Set[Expression] = {
-    val constraintClasses = generateEquivalentConstraintClasses(constraints)
-
+    val aliasedConstraints = 
eliminateAliasedExpressionInConstraints(constraints)
     var inferredConstraints = Set.empty[Expression]
-    constraints.foreach {
+    aliasedConstraints.foreach {
       case eq @ EqualTo(l: Attribute, r: Attribute) =>
-        val candidateConstraints = constraints - eq
-        inferredConstraints ++= candidateConstraints.map(_ transform {
-          case a: Attribute if a.semanticEquals(l) &&
-            !isRecursiveDeduction(r, constraintClasses) => r
-        })
-        inferredConstraints ++= candidateConstraints.map(_ transform {
-          case a: Attribute if a.semanticEquals(r) &&
-            !isRecursiveDeduction(l, constraintClasses) => l
-        })
+        val candidateConstraints = aliasedConstraints - eq
+        inferredConstraints ++= replaceConstraints(candidateConstraints, l, r)
+        inferredConstraints ++= replaceConstraints(candidateConstraints, r, l)
       case _ => // No inference
     }
     inferredConstraints -- constraints
   }
 
   /**
-   * Generate a sequence of expression sets from constraints, where each set 
stores an equivalence
-   * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will 
generate the following
-   * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search 
all expressions equal
-   * to an selected attribute.
+   * Replace the aliased expression in [[Alias]] with the alias name if both 
exist in constraints.
+   * Thus non-converging inference can be prevented.
+   * E.g. `Alias(b, f(a)), a = b` infers `f(a) = f(f(a))` without eliminating 
aliased expressions.
+   * Also, the size of constraints is reduced without losing any information.
+   * When the inferred filters are pushed down the operators that generate the 
alias,
+   * the alias names used in filters are replaced by the aliased expressions.
    */
-  private def generateEquivalentConstraintClasses(
-      constraints: Set[Expression]): Seq[Set[Expression]] = {
-    var constraintClasses = Seq.empty[Set[Expression]]
-    constraints.foreach {
-      case eq @ EqualTo(l: Attribute, r: Attribute) =>
-        // Transform [[Alias]] to its child.
-        val left = aliasMap.getOrElse(l, l)
-        val right = aliasMap.getOrElse(r, r)
-        // Get the expression set for an equivalence constraint class.
-        val leftConstraintClass = getConstraintClass(left, constraintClasses)
-        val rightConstraintClass = getConstraintClass(right, constraintClasses)
-        if (leftConstraintClass.nonEmpty && rightConstraintClass.nonEmpty) {
-          // Combine the two sets.
-          constraintClasses = constraintClasses
-            .diff(leftConstraintClass :: rightConstraintClass :: Nil) :+
-            (leftConstraintClass ++ rightConstraintClass)
-        } else if (leftConstraintClass.nonEmpty) { // && 
rightConstraintClass.isEmpty
-          // Update equivalence class of `left` expression.
-          constraintClasses = constraintClasses
-            .diff(leftConstraintClass :: Nil) :+ (leftConstraintClass + right)
-        } else if (rightConstraintClass.nonEmpty) { // && 
leftConstraintClass.isEmpty
-          // Update equivalence class of `right` expression.
-          constraintClasses = constraintClasses
-            .diff(rightConstraintClass :: Nil) :+ (rightConstraintClass + left)
-        } else { // leftConstraintClass.isEmpty && rightConstraintClass.isEmpty
-          // Create new equivalence constraint class since neither expression 
presents
-          // in any classes.
-          constraintClasses = constraintClasses :+ Set(left, right)
-        }
-      case _ => // Skip
+  private def eliminateAliasedExpressionInConstraints(constraints: 
Set[Expression])
+    : Set[Expression] = {
+    val attributesInEqualTo = constraints.flatMap {
+      case EqualTo(l: Attribute, r: Attribute) => l :: r :: Nil
+      case _ => Nil
     }
-
-    constraintClasses
-  }
-
-  /**
-   * Get all expressions equivalent to the selected expression.
-   */
-  private def getConstraintClass(
-      expr: Expression,
-      constraintClasses: Seq[Set[Expression]]): Set[Expression] =
-    constraintClasses.find(_.contains(expr)).getOrElse(Set.empty[Expression])
-
-  /**
-   * Check whether replace by an [[Attribute]] will cause a recursive 
deduction. Generally it
-   * has the form like: `a -> f(a, b)`, where `a` and `b` are expressions and 
`f` is a function.
-   * Here we first get all expressions equal to `attr` and then check whether 
at least one of them
-   * is a child of the referenced expression.
-   */
-  private def isRecursiveDeduction(
-      attr: Attribute,
-      constraintClasses: Seq[Set[Expression]]): Boolean = {
-    val expr = aliasMap.getOrElse(attr, attr)
-    getConstraintClass(expr, constraintClasses).exists { e =>
-      expr.children.exists(_.semanticEquals(e))
+    var aliasedConstraints = constraints
+    attributesInEqualTo.foreach { a =>
+      if (aliasMap.contains(a)) {
+        val child = aliasMap.get(a).get
+        aliasedConstraints = replaceConstraints(aliasedConstraints, child, a)
+      }
     }
+    aliasedConstraints
   }
+
+  private def replaceConstraints(
+      constraints: Set[Expression],
+      source: Expression,
+      destination: Attribute): Set[Expression] = constraints.map(_ transform {
+    case e: Expression if e.semanticEquals(source) => destination
+  })
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1a985747/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index d2dd469..5580f86 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -151,9 +151,9 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
       .join(t2, Inner, Some("t.a".attr === "t2.a".attr && "t.d".attr === 
"t2.a".attr))
       .analyze
     val correctAnswer = t1
-      .where(IsNotNull('a) && IsNotNull('b) && 'a <=> 'a && 'b <=> 'b &&'a === 
'b)
+      .where(IsNotNull('a) && IsNotNull('b) &&'a === 'b)
       .select('a, 'b.as('d)).as("t")
-      .join(t2.where(IsNotNull('a) && 'a <=> 'a), Inner,
+      .join(t2.where(IsNotNull('a)), Inner,
         Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr))
       .analyze
     val optimized = Optimize.execute(originalQuery)
@@ -176,17 +176,17 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
           && "t.int_col".attr === "t2.a".attr))
       .analyze
     val correctAnswer = t1
-      .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))
-        && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a))
-        && Coalesce(Seq('b, 'b)) <=> 'a && 'a === 'b && 
IsNotNull(Coalesce(Seq('a, 'b)))
-        && 'a === Coalesce(Seq('a, 'b)) && Coalesce(Seq('a, 'b)) === 'b
-        && IsNotNull('b) && IsNotNull(Coalesce(Seq('b, 'b)))
-        && 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b)))
+      .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))  && 
IsNotNull(Coalesce(Seq('b, 'a)))
+        && IsNotNull('b) && IsNotNull(Coalesce(Seq('b, 'b))) && 
IsNotNull(Coalesce(Seq('a, 'b)))
+        && 'a === 'b && 'a === Coalesce(Seq('a, 'a)) && 'a === 
Coalesce(Seq('a, 'b))
+        && 'a === Coalesce(Seq('b, 'a)) && 'b === Coalesce(Seq('a, 'b))
+        && 'b === Coalesce(Seq('b, 'a)) && 'b === Coalesce(Seq('b, 'b)))
       .select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col))
       .select('int_col, 'd, 'a).as("t")
-      .join(t2
-        .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a)))
-          && 'a <=> Coalesce(Seq('a, 'a)) && 'a === Coalesce(Seq('a, 'a)) && 
'a <=> 'a), Inner,
+      .join(
+        t2.where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a))) &&
+          'a === Coalesce(Seq('a, 'a))),
+        Inner,
         Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr
           && "t.int_col".attr === "t2.a".attr))
       .analyze
@@ -194,6 +194,30 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
     comparePlans(optimized, correctAnswer)
   }
 
+  test("inner join with EqualTo expressions containing part of each other: 
don't generate " +
+    "constraints for recursive functions") {
+    val t1 = testRelation.subquery('t1)
+    val t2 = testRelation.subquery('t2)
+
+    // We should prevent `c = Coalese(a, b)` and `a = Coalese(b, c)` from 
recursively creating
+    // complicated constraints through the constraint inference procedure.
+    val originalQuery = t1
+      .select('a, 'b, 'c, Coalesce(Seq('b, 'c)).as('d), Coalesce(Seq('a, 
'b)).as('e))
+      .where('a === 'd && 'c === 'e)
+      .join(t2, Inner, Some("t1.a".attr === "t2.a".attr && "t1.c".attr === 
"t2.c".attr))
+      .analyze
+    val correctAnswer = t1
+      .where(IsNotNull('a) && IsNotNull('c) && 'a === Coalesce(Seq('b, 'c)) &&
+        'c === Coalesce(Seq('a, 'b)))
+      .select('a, 'b, 'c, Coalesce(Seq('b, 'c)).as('d), Coalesce(Seq('a, 
'b)).as('e))
+      .join(t2.where(IsNotNull('a) && IsNotNull('c)),
+        Inner,
+        Some("t1.a".attr === "t2.a".attr && "t1.c".attr === "t2.c".attr))
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
   test("generate correct filters for alias that don't produce recursive 
constraints") {
     val t1 = testRelation.subquery('t1)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1a985747/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index a37e06d..866ff0d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -134,8 +134,6 @@ class ConstraintPropagationSuite extends SparkFunSuite with 
PlanTest {
     verifyConstraints(aliasedRelation.analyze.constraints,
       ExpressionSet(Seq(resolveColumn(aliasedRelation.analyze, "x") > 10,
         IsNotNull(resolveColumn(aliasedRelation.analyze, "x")),
-        resolveColumn(aliasedRelation.analyze, "b") <=> 
resolveColumn(aliasedRelation.analyze, "y"),
-        resolveColumn(aliasedRelation.analyze, "z") <=> 
resolveColumn(aliasedRelation.analyze, "x"),
         resolveColumn(aliasedRelation.analyze, "z") > 10,
         IsNotNull(resolveColumn(aliasedRelation.analyze, "z")))))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to