Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18298#discussion_r122008512
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlanConstraints.scala
 ---
    @@ -0,0 +1,206 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.plans
    +
    +import org.apache.spark.sql.catalyst.expressions._
    +
    +
    +trait QueryPlanConstraints[PlanType <: QueryPlan[PlanType]] { self: 
QueryPlan[PlanType] =>
    +
    +  /**
    +   * An [[ExpressionSet]] that contains invariants about the rows output 
by this operator. For
    +   * example, if this set contains the expression `a = 2` then that 
expression is guaranteed to
    +   * evaluate to `true` for all rows produced.
    +   */
    +  lazy val constraints: ExpressionSet = 
ExpressionSet(getRelevantConstraints(validConstraints))
    +
    +  /**
    +   * Returns [[constraints]] depending on the config of enabling 
constraint propagation. If the
    +   * flag is disabled, simply returning an empty constraints.
    +   */
    +  def getConstraints(constraintPropagationEnabled: Boolean): ExpressionSet 
=
    +    if (constraintPropagationEnabled) {
    +      constraints
    +    } else {
    +      ExpressionSet(Set.empty)
    +    }
    +
    +  /**
    +   * This method can be overridden by any child class of QueryPlan to 
specify a set of constraints
    +   * based on the given operator's constraint propagation logic. These 
constraints are then
    +   * canonicalized and filtered automatically to contain only those 
attributes that appear in the
    +   * [[outputSet]].
    +   *
    +   * See [[Canonicalize]] for more details.
    +   */
    +  protected def validConstraints: Set[Expression] = Set.empty
    +
    +  /**
    +   * Extracts the relevant constraints from a given set of constraints 
based on the attributes that
    +   * appear in the [[outputSet]].
    +   */
    +  protected def getRelevantConstraints(constraints: Set[Expression]): 
Set[Expression] = {
    +    constraints
    +      .union(inferAdditionalConstraints(constraints))
    +      .union(constructIsNotNullConstraints(constraints))
    +      .filter(constraint =>
    +        constraint.references.nonEmpty && 
constraint.references.subsetOf(outputSet) &&
    +          constraint.deterministic)
    +  }
    +
    +  /**
    +   * Infers a set of `isNotNull` constraints from null intolerant 
expressions as well as
    +   * non-nullable attributes. For e.g., if an expression is of the form 
(`a > 5`), this
    +   * returns a constraint of the form `isNotNull(a)`
    +   */
    +  private def constructIsNotNullConstraints(constraints: Set[Expression]): 
Set[Expression] = {
    +    // First, we propagate constraints from the null intolerant 
expressions.
    +    var isNotNullConstraints: Set[Expression] = 
constraints.flatMap(inferIsNotNullConstraints)
    +
    +    // Second, we infer additional constraints from non-nullable 
attributes that are part of the
    +    // operator's output
    +    val nonNullableAttributes = output.filterNot(_.nullable)
    +    isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull).toSet
    +
    +    isNotNullConstraints -- constraints
    +  }
    +
    +  /**
    +   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
    +   * of constraints.
    +   */
    +  private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
    +    constraint match {
    +      // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
    +      // expressions
    +      case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
    +      // Constraints always return true for all the inputs. That means, 
null will never be returned.
    +      // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
    +      // null intolerant expressions.
    +      case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
    +    }
    +
    +  /**
    +   * Recursively explores the expressions which are null intolerant and 
returns all attributes
    +   * in these expressions.
    +   */
    +  private def scanNullIntolerantAttribute(expr: Expression): 
Seq[Attribute] = expr match {
    +    case a: Attribute => Seq(a)
    +    case _: NullIntolerant => 
expr.children.flatMap(scanNullIntolerantAttribute)
    +    case _ => Seq.empty[Attribute]
    +  }
    +
    +  // Collect aliases from expressions of the whole tree rooted by the 
current QueryPlan node, so
    +  // we may avoid producing recursive constraints.
    +  private lazy val aliasMap: AttributeMap[Expression] = AttributeMap(
    +    expressions.collect {
    +      case a: Alias => (a.toAttribute, a.child)
    +    } ++ 
children.flatMap(_.asInstanceOf[QueryPlanConstraints[PlanType]].aliasMap))
    --- End diff --
    
    that would create a cyclic hierarchy ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to