Repository: spark
Updated Branches:
  refs/heads/master e862dc904 -> b6b108826


[SPARK-21103][SQL] QueryPlanConstraints should be part of LogicalPlan

## What changes were proposed in this pull request?
QueryPlanConstraints should be part of LogicalPlan, rather than QueryPlan, 
since the constraint framework is only used for query plan rewriting and not 
for physical planning.

## How was this patch tested?
Should be covered by existing tests, since it is a simple refactoring.

Author: Reynold Xin <r...@databricks.com>

Closes #18310 from rxin/SPARK-21103.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6b10882
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6b10882
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6b10882

Branch: refs/heads/master
Commit: b6b108826a5dd5c889a70180365f9320452557fc
Parents: e862dc9
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Jun 20 11:34:22 2017 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Jun 20 11:34:22 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/QueryPlan.scala    |   5 +-
 .../catalyst/plans/QueryPlanConstraints.scala   | 195 ------------------
 .../catalyst/plans/logical/LogicalPlan.scala    |   2 +-
 .../plans/logical/QueryPlanConstraints.scala    | 196 +++++++++++++++++++
 4 files changed, 198 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6b10882/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 9130b14..1f6d05b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -22,10 +22,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructType}
 
-abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
-  extends TreeNode[PlanType]
-  with QueryPlanConstraints[PlanType] {
-
+abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends 
TreeNode[PlanType] {
   self: PlanType =>
 
   def conf: SQLConf = SQLConf.get

http://git-wip-us.apache.org/repos/asf/spark/blob/b6b10882/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
deleted file mode 100644
index b08a009..0000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans
-
-import org.apache.spark.sql.catalyst.expressions._
-
-
-trait QueryPlanConstraints[PlanType <: QueryPlan[PlanType]] { self: 
QueryPlan[PlanType] =>
-
-  /**
-   * An [[ExpressionSet]] that contains invariants about the rows output by 
this operator. For
-   * example, if this set contains the expression `a = 2` then that expression 
is guaranteed to
-   * evaluate to `true` for all rows produced.
-   */
-  lazy val constraints: ExpressionSet = {
-    if (conf.constraintPropagationEnabled) {
-      ExpressionSet(
-        validConstraints
-          .union(inferAdditionalConstraints(validConstraints))
-          .union(constructIsNotNullConstraints(validConstraints))
-          .filter { c =>
-            c.references.nonEmpty && c.references.subsetOf(outputSet) && 
c.deterministic
-          }
-      )
-    } else {
-      ExpressionSet(Set.empty)
-    }
-  }
-
-  /**
-   * This method can be overridden by any child class of QueryPlan to specify 
a set of constraints
-   * based on the given operator's constraint propagation logic. These 
constraints are then
-   * canonicalized and filtered automatically to contain only those attributes 
that appear in the
-   * [[outputSet]].
-   *
-   * See [[Canonicalize]] for more details.
-   */
-  protected def validConstraints: Set[Expression] = Set.empty
-
-  /**
-   * Infers a set of `isNotNull` constraints from null intolerant expressions 
as well as
-   * non-nullable attributes. For e.g., if an expression is of the form (`a > 
5`), this
-   * returns a constraint of the form `isNotNull(a)`
-   */
-  private def constructIsNotNullConstraints(constraints: Set[Expression]): 
Set[Expression] = {
-    // First, we propagate constraints from the null intolerant expressions.
-    var isNotNullConstraints: Set[Expression] = 
constraints.flatMap(inferIsNotNullConstraints)
-
-    // Second, we infer additional constraints from non-nullable attributes 
that are part of the
-    // operator's output
-    val nonNullableAttributes = output.filterNot(_.nullable)
-    isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull).toSet
-
-    isNotNullConstraints -- constraints
-  }
-
-  /**
-   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
-   * of constraints.
-   */
-  private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
-    constraint match {
-      // When the root is IsNotNull, we can push IsNotNull through the child 
null intolerant
-      // expressions
-      case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
-      // Constraints always return true for all the inputs. That means, null 
will never be returned.
-      // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull 
through the child
-      // null intolerant expressions.
-      case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
-    }
-
-  /**
-   * Recursively explores the expressions which are null intolerant and 
returns all attributes
-   * in these expressions.
-   */
-  private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = 
expr match {
-    case a: Attribute => Seq(a)
-    case _: NullIntolerant => 
expr.children.flatMap(scanNullIntolerantAttribute)
-    case _ => Seq.empty[Attribute]
-  }
-
-  // Collect aliases from expressions of the whole tree rooted by the current 
QueryPlan node, so
-  // we may avoid producing recursive constraints.
-  private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
-    expressions.collect {
-      case a: Alias => (a.toAttribute, a.child)
-    } ++ 
children.flatMap(_.asInstanceOf[QueryPlanConstraints[PlanType]].aliasMap))
-
-  /**
-   * Infers an additional set of constraints from a given set of equality 
constraints.
-   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), 
this returns an
-   * additional constraint of the form `b = 5`.
-   *
-   * [SPARK-17733] We explicitly prevent producing recursive constraints of 
the form `a = f(a, b)`
-   * as they are often useless and can lead to a non-converging set of 
constraints.
-   */
-  private def inferAdditionalConstraints(constraints: Set[Expression]): 
Set[Expression] = {
-    val constraintClasses = generateEquivalentConstraintClasses(constraints)
-
-    var inferredConstraints = Set.empty[Expression]
-    constraints.foreach {
-      case eq @ EqualTo(l: Attribute, r: Attribute) =>
-        val candidateConstraints = constraints - eq
-        inferredConstraints ++= candidateConstraints.map(_ transform {
-          case a: Attribute if a.semanticEquals(l) &&
-            !isRecursiveDeduction(r, constraintClasses) => r
-        })
-        inferredConstraints ++= candidateConstraints.map(_ transform {
-          case a: Attribute if a.semanticEquals(r) &&
-            !isRecursiveDeduction(l, constraintClasses) => l
-        })
-      case _ => // No inference
-    }
-    inferredConstraints -- constraints
-  }
-
-  /**
-   * Generate a sequence of expression sets from constraints, where each set 
stores an equivalence
-   * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will 
generate the following
-   * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search 
all expressions equal
-   * to an selected attribute.
-   */
-  private def generateEquivalentConstraintClasses(
-      constraints: Set[Expression]): Seq[Set[Expression]] = {
-    var constraintClasses = Seq.empty[Set[Expression]]
-    constraints.foreach {
-      case eq @ EqualTo(l: Attribute, r: Attribute) =>
-        // Transform [[Alias]] to its child.
-        val left = aliasMap.getOrElse(l, l)
-        val right = aliasMap.getOrElse(r, r)
-        // Get the expression set for an equivalence constraint class.
-        val leftConstraintClass = getConstraintClass(left, constraintClasses)
-        val rightConstraintClass = getConstraintClass(right, constraintClasses)
-        if (leftConstraintClass.nonEmpty && rightConstraintClass.nonEmpty) {
-          // Combine the two sets.
-          constraintClasses = constraintClasses
-            .diff(leftConstraintClass :: rightConstraintClass :: Nil) :+
-            (leftConstraintClass ++ rightConstraintClass)
-        } else if (leftConstraintClass.nonEmpty) { // && 
rightConstraintClass.isEmpty
-          // Update equivalence class of `left` expression.
-          constraintClasses = constraintClasses
-            .diff(leftConstraintClass :: Nil) :+ (leftConstraintClass + right)
-        } else if (rightConstraintClass.nonEmpty) { // && 
leftConstraintClass.isEmpty
-          // Update equivalence class of `right` expression.
-          constraintClasses = constraintClasses
-            .diff(rightConstraintClass :: Nil) :+ (rightConstraintClass + left)
-        } else { // leftConstraintClass.isEmpty && rightConstraintClass.isEmpty
-          // Create new equivalence constraint class since neither expression 
presents
-          // in any classes.
-          constraintClasses = constraintClasses :+ Set(left, right)
-        }
-      case _ => // Skip
-    }
-
-    constraintClasses
-  }
-
-  /**
-   * Get all expressions equivalent to the selected expression.
-   */
-  private def getConstraintClass(
-      expr: Expression,
-      constraintClasses: Seq[Set[Expression]]): Set[Expression] =
-    constraintClasses.find(_.contains(expr)).getOrElse(Set.empty[Expression])
-
-  /**
-   * Check whether replace by an [[Attribute]] will cause a recursive 
deduction. Generally it
-   * has the form like: `a -> f(a, b)`, where `a` and `b` are expressions and 
`f` is a function.
-   * Here we first get all expressions equal to `attr` and then check whether 
at least one of them
-   * is a child of the referenced expression.
-   */
-  private def isRecursiveDeduction(
-      attr: Attribute,
-      constraintClasses: Seq[Set[Expression]]): Boolean = {
-    val expr = aliasMap.getOrElse(attr, attr)
-    getConstraintClass(expr, constraintClasses).exists { e =>
-      expr.children.exists(_.semanticEquals(e))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b6b10882/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 2ebb2ff..95b4165 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 
 
-abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
+abstract class LogicalPlan extends QueryPlan[LogicalPlan] with 
QueryPlanConstraints with Logging {
 
   private var _analyzed: Boolean = false
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b6b10882/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
new file mode 100644
index 0000000..8bffbd0
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions._
+
+
+trait QueryPlanConstraints { self: LogicalPlan =>
+
+  /**
+   * An [[ExpressionSet]] that contains invariants about the rows output by 
this operator. For
+   * example, if this set contains the expression `a = 2` then that expression 
is guaranteed to
+   * evaluate to `true` for all rows produced.
+   */
+  lazy val constraints: ExpressionSet = {
+    if (conf.constraintPropagationEnabled) {
+      ExpressionSet(
+        validConstraints
+          .union(inferAdditionalConstraints(validConstraints))
+          .union(constructIsNotNullConstraints(validConstraints))
+          .filter { c =>
+            c.references.nonEmpty && c.references.subsetOf(outputSet) && 
c.deterministic
+          }
+      )
+    } else {
+      ExpressionSet(Set.empty)
+    }
+  }
+
+  /**
+   * This method can be overridden by any child class of QueryPlan to specify 
a set of constraints
+   * based on the given operator's constraint propagation logic. These 
constraints are then
+   * canonicalized and filtered automatically to contain only those attributes 
that appear in the
+   * [[outputSet]].
+   *
+   * See [[Canonicalize]] for more details.
+   */
+  protected def validConstraints: Set[Expression] = Set.empty
+
+  /**
+   * Infers a set of `isNotNull` constraints from null intolerant expressions 
as well as
+   * non-nullable attributes. For e.g., if an expression is of the form (`a > 
5`), this
+   * returns a constraint of the form `isNotNull(a)`
+   */
+  private def constructIsNotNullConstraints(constraints: Set[Expression]): 
Set[Expression] = {
+    // First, we propagate constraints from the null intolerant expressions.
+    var isNotNullConstraints: Set[Expression] = 
constraints.flatMap(inferIsNotNullConstraints)
+
+    // Second, we infer additional constraints from non-nullable attributes 
that are part of the
+    // operator's output
+    val nonNullableAttributes = output.filterNot(_.nullable)
+    isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull).toSet
+
+    isNotNullConstraints -- constraints
+  }
+
+  /**
+   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
+   * of constraints.
+   */
+  private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
+    constraint match {
+      // When the root is IsNotNull, we can push IsNotNull through the child 
null intolerant
+      // expressions
+      case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+      // Constraints always return true for all the inputs. That means, null 
will never be returned.
+      // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull 
through the child
+      // null intolerant expressions.
+      case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+    }
+
+  /**
+   * Recursively explores the expressions which are null intolerant and 
returns all attributes
+   * in these expressions.
+   */
+  private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = 
expr match {
+    case a: Attribute => Seq(a)
+    case _: NullIntolerant => 
expr.children.flatMap(scanNullIntolerantAttribute)
+    case _ => Seq.empty[Attribute]
+  }
+
+  // Collect aliases from expressions of the whole tree rooted by the current 
QueryPlan node, so
+  // we may avoid producing recursive constraints.
+  private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
+    expressions.collect {
+      case a: Alias => (a.toAttribute, a.child)
+    } ++ children.flatMap(_.asInstanceOf[QueryPlanConstraints].aliasMap))
+    // Note: the explicit cast is necessary, since Scala compiler fails to 
infer the type.
+
+  /**
+   * Infers an additional set of constraints from a given set of equality 
constraints.
+   * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), 
this returns an
+   * additional constraint of the form `b = 5`.
+   *
+   * [SPARK-17733] We explicitly prevent producing recursive constraints of 
the form `a = f(a, b)`
+   * as they are often useless and can lead to a non-converging set of 
constraints.
+   */
+  private def inferAdditionalConstraints(constraints: Set[Expression]): 
Set[Expression] = {
+    val constraintClasses = generateEquivalentConstraintClasses(constraints)
+
+    var inferredConstraints = Set.empty[Expression]
+    constraints.foreach {
+      case eq @ EqualTo(l: Attribute, r: Attribute) =>
+        val candidateConstraints = constraints - eq
+        inferredConstraints ++= candidateConstraints.map(_ transform {
+          case a: Attribute if a.semanticEquals(l) &&
+            !isRecursiveDeduction(r, constraintClasses) => r
+        })
+        inferredConstraints ++= candidateConstraints.map(_ transform {
+          case a: Attribute if a.semanticEquals(r) &&
+            !isRecursiveDeduction(l, constraintClasses) => l
+        })
+      case _ => // No inference
+    }
+    inferredConstraints -- constraints
+  }
+
+  /**
+   * Generate a sequence of expression sets from constraints, where each set 
stores an equivalence
+   * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will 
generate the following
+   * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search 
all expressions equal
+   * to an selected attribute.
+   */
+  private def generateEquivalentConstraintClasses(
+      constraints: Set[Expression]): Seq[Set[Expression]] = {
+    var constraintClasses = Seq.empty[Set[Expression]]
+    constraints.foreach {
+      case eq @ EqualTo(l: Attribute, r: Attribute) =>
+        // Transform [[Alias]] to its child.
+        val left = aliasMap.getOrElse(l, l)
+        val right = aliasMap.getOrElse(r, r)
+        // Get the expression set for an equivalence constraint class.
+        val leftConstraintClass = getConstraintClass(left, constraintClasses)
+        val rightConstraintClass = getConstraintClass(right, constraintClasses)
+        if (leftConstraintClass.nonEmpty && rightConstraintClass.nonEmpty) {
+          // Combine the two sets.
+          constraintClasses = constraintClasses
+            .diff(leftConstraintClass :: rightConstraintClass :: Nil) :+
+            (leftConstraintClass ++ rightConstraintClass)
+        } else if (leftConstraintClass.nonEmpty) { // && 
rightConstraintClass.isEmpty
+          // Update equivalence class of `left` expression.
+          constraintClasses = constraintClasses
+            .diff(leftConstraintClass :: Nil) :+ (leftConstraintClass + right)
+        } else if (rightConstraintClass.nonEmpty) { // && 
leftConstraintClass.isEmpty
+          // Update equivalence class of `right` expression.
+          constraintClasses = constraintClasses
+            .diff(rightConstraintClass :: Nil) :+ (rightConstraintClass + left)
+        } else { // leftConstraintClass.isEmpty && rightConstraintClass.isEmpty
+          // Create new equivalence constraint class since neither expression 
presents
+          // in any classes.
+          constraintClasses = constraintClasses :+ Set(left, right)
+        }
+      case _ => // Skip
+    }
+
+    constraintClasses
+  }
+
+  /**
+   * Get all expressions equivalent to the selected expression.
+   */
+  private def getConstraintClass(
+      expr: Expression,
+      constraintClasses: Seq[Set[Expression]]): Set[Expression] =
+    constraintClasses.find(_.contains(expr)).getOrElse(Set.empty[Expression])
+
+  /**
+   * Check whether replace by an [[Attribute]] will cause a recursive 
deduction. Generally it
+   * has the form like: `a -> f(a, b)`, where `a` and `b` are expressions and 
`f` is a function.
+   * Here we first get all expressions equal to `attr` and then check whether 
at least one of them
+   * is a child of the referenced expression.
+   */
+  private def isRecursiveDeduction(
+      attr: Attribute,
+      constraintClasses: Seq[Set[Expression]]): Boolean = {
+    val expr = aliasMap.getOrElse(attr, attr)
+    getConstraintClass(expr, constraintClasses).exists { e =>
+      expr.children.exists(_.semanticEquals(e))
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to