Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9995#discussion_r46002446
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -675,34 +675,34 @@ object PushPredicateThroughGenerate extends 
Rule[LogicalPlan] with PredicateHelp
     }
     
     /**
    - * Push [[Filter]] operators through [[Aggregate]] operators. Parts of the 
predicate that reference
    - * attributes which are subset of group by attribute set of [[Aggregate]] 
will be pushed beneath,
    - * and the rest should remain above.
    + * Push [[Filter]] operators through [[Aggregate]] operators, iff the 
filters reference only
    + * non-aggregate attributes (typically literals or grouping expressions).
      */
     object PushPredicateThroughAggregate extends Rule[LogicalPlan] with 
PredicateHelper {
     
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    -    case filter @ Filter(condition,
    -        aggregate @ Aggregate(groupingExpressions, aggregateExpressions, 
grandChild)) =>
    -
    -      def hasAggregate(expression: Expression): Boolean = expression match 
{
    -        case agg: AggregateExpression => true
    -        case other => expression.children.exists(hasAggregate)
    -      }
    -      // Create a map of Alias for expressions that does not have 
AggregateExpression
    -      val aliasMap = AttributeMap(aggregateExpressions.collect {
    -        case a: Alias if !hasAggregate(a.child) => (a.toAttribute, a.child)
    +    case filter @ Filter(condition, aggregate: Aggregate) =>
    +      // Find all the aliased expressions in the aggregate list that don't 
include any actual
    +      // AggregateExpression, and create a map from the alias to the 
expression
    +      val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {
    +        case a: Alias if 
a.child.find(_.isInstanceOf[AggregateExpression]).isEmpty =>
    +          (a.toAttribute, a.child)
           })
     
    -      val (pushDown, stayUp) = 
splitConjunctivePredicates(condition).partition { conjunct =>
    -        val replaced = replaceAlias(conjunct, aliasMap)
    -        replaced.references.subsetOf(grandChild.outputSet) && 
replaced.deterministic
    +      // For each filter, expand the alias and check if the filter can be 
evaluated using
    +      // attributes produced by the aggregate operator's child operator.
    +      val (pushDown, stayUp) = 
splitConjunctivePredicates(condition).partition { cond =>
    +        val replaced = replaceAlias(cond, aliasMap)
    +        replaced.references.subsetOf(aggregate.child.outputSet) && 
replaced.deterministic
           }
    +
           if (pushDown.nonEmpty) {
             val pushDownPredicate = pushDown.reduce(And)
             val replaced = replaceAlias(pushDownPredicate, aliasMap)
    -        val withPushdown = aggregate.copy(child = Filter(replaced, 
grandChild))
    -        stayUp.reduceOption(And).map(Filter(_, 
withPushdown)).getOrElse(withPushdown)
    +        val newAggregate = aggregate.copy(child = Filter(replaced, 
aggregate.child))
    +        // If there is no more filter to stay up, just eliminate the 
filter.
    +        // Otherwise, create Filter(pushDownPredicate) -> Aggregate -> 
Filter(stayUp).
    --- End diff --
    
    ok i changed the direction of the arrows and also moved the operators 
around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to