Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/19193#discussion_r156058341 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -1920,7 +1927,34 @@ class Analyzer( case p: LogicalPlan if !p.childrenResolved => p - // Aggregate without Having clause. + // Extract window expressions from aggregate functions. There might be an aggregate whose + // aggregate function contains a window expression as a child, which we need to extract. + // e.g., df.groupBy().agg(max(rank().over(window)) + case a @ Aggregate(groupingExprs, aggregateExprs, child) + if containsAggregateFunctionWithWindowExpression(aggregateExprs) && + a.expressions.forall(_.resolved) => + + val windowExprAliases = new ArrayBuffer[NamedExpression]() + val newAggregateExprs = aggregateExprs.map { expr => + expr.transform { + case aggExpr @ AggregateExpression(func, _, _, _) if hasWindowFunction(func.children) => + val newFuncChildren = func.children.map { funcExpr => + funcExpr.transform { + case we: WindowExpression => + // Replace window expressions with aliases to them + val windowExprAlias = Alias(we, s"_we${windowExprAliases.length}")() + windowExprAliases += windowExprAlias + windowExprAlias.toAttribute + } + } + val newFunc = func.withNewChildren(newFuncChildren).asInstanceOf[AggregateFunction] + aggExpr.copy(aggregateFunction = newFunc) + }.asInstanceOf[NamedExpression] + } + val window = addWindow(windowExprAliases, child) + // TODO do we also need a projection here? + Aggregate(groupingExprs, newAggregateExprs, window) --- End diff -- No you don't need a Project.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org