Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8371#discussion_r37814386
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -559,21 +529,79 @@ class Analyzer(
       }
     
       /**
    -   * This rule finds expressions in HAVING clause filters that depend on
    -   * unresolved attributes.  It pushes these expressions down to the 
underlying
    -   * aggregates and then projects them away above the filter.
    +   * This rule finds aggregate expressions that are not in an aggregate 
operator.  For example,
    +   * those in a HAVING clause or ORDER BY clause.  These expressions are 
pushed down to the
    +   * underlying aggregate operator and then projected away after the 
original operator.
        */
    -  object UnresolvedHavingClauseAttributes extends Rule[LogicalPlan] {
    +  object ResolveAggregateFunctions extends Rule[LogicalPlan] {
         def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
    -      case filter @ Filter(havingCondition, aggregate @ Aggregate(_, 
originalAggExprs, _))
    -          if aggregate.resolved && containsAggregate(havingCondition) =>
    -
    -        val evaluatedCondition = Alias(havingCondition, 
"havingCondition")()
    -        val aggExprsWithHaving = evaluatedCondition +: originalAggExprs
    +      case filter @ Filter(havingCondition,
    +             aggregate @ Aggregate(grouping, originalAggExprs, child))
    +          if aggregate.resolved && !filter.resolved =>
    +
    +        // Try resolving the condition of the filter as though it is in 
the aggregate clause
    +        val aggregatedCondition =
    +          Aggregate(grouping, Alias(havingCondition, "havingCondition")() 
:: Nil, child)
    +        val resolvedOperator = execute(aggregatedCondition)
    +        def resolvedAggregateFilter =
    +          resolvedOperator
    +            .asInstanceOf[Aggregate]
    +            .aggregateExpressions.head
    +
    +        // If resolution was successful and we see the filter has an 
aggregate in it, add it to
    +        // the original aggregate operator.
    +        if (resolvedOperator.resolved && 
containsAggregate(resolvedAggregateFilter)) {
    +          val aggExprsWithHaving = resolvedAggregateFilter +: 
originalAggExprs
    +
    +          Project(aggregate.output,
    +            Filter(resolvedAggregateFilter.toAttribute,
    +              aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
    +        } else {
    +          filter
    +        }
     
    -        Project(aggregate.output,
    -          Filter(evaluatedCondition.toAttribute,
    -            aggregate.copy(aggregateExpressions = aggExprsWithHaving)))
    +      case sort @ Sort(sortOrder, global,
    +             aggregate @ Aggregate(grouping, originalAggExprs, child))
    +        if aggregate.resolved && !sort.resolved =>
    +
    +        // Try resolving the ordering as though it is in the aggregate 
clause.
    +        try {
    +          val aliasedOrder = sortOrder.map(o => Alias(o.child, 
"aggOrder")())
    --- End diff --
    
    oh, seems we need to avoid adding `Alias` if the a ordering expression is a 
NamedExpression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to