Github user aokolnychyi commented on the issue: https://github.com/apache/spark/pull/19193 @hvanhovell here is a summary of tried scenarios: ``` val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b") val window1 = Window.orderBy('a) val window2 = Window.orderBy('a.desc) ``` **Scenario 1: An aggregate on top of a window expression (did not work before, looks OK now)** ``` df.groupBy().agg(max(rank().over(window1))).explain(true) df.groupBy().agg(max(rank().over(window1))).show(false) == Analyzed Logical Plan == max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#16] +- Project [a#5, b#6, _we0#27, _we0#27] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#27], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Aggregate [max(_we0#27) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#16] +- Project [_we0#27, _we0#27] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#27], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5] +-----------------------------------------------------------------+ |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))| +-----------------------------------------------------------------+ |4 | +-----------------------------------------------------------------+ ``` **Scenario 2: An aggregate with grouping expressions on top of a window expression** TODO: Is the result wrong? What is expected? ``` df.groupBy('a).agg(max(rank().over(window1))).explain(true) df.groupBy('a).agg(max(rank().over(window1))).show(false) == Analyzed Logical Plan == a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#63] +- Project [a#5, b#6, _we0#75, _we0#75] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#75], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Aggregate [a#5], [a#5, max(_we0#75) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#63] +- Project [a#5, _we0#75, _we0#75] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#75], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5] +---+-----------------------------------------------------------------+ |a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))| +---+-----------------------------------------------------------------+ |1 |1 | |2 |3 | |5 |4 | +---+-----------------------------------------------------------------+ ``` **Scenario 3: A normal aggregate, an aggregate on top of a window expression, a window expression on top of an aggregate in one query** This is resolved in two steps. ``` df.groupBy('a).agg(max(rank().over(window1)), sum('b), sum(sum('b)).over(window2)).explain(true) df.groupBy('a).agg(max(rank().over(window1)), sum('b), sum(sum('b)).over(window2)).show(false) == Analyzed Logical Plan == a: int, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$())): int, sum(b): bigint, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$()): bigint Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(b)#117L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L] +- Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(b)#117L, _w0#137L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L] +- Window [sum(_w0#137L) windowspecdefinition(a#5 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L], [a#5 DESC NULLS LAST] +- Aggregate [a#5], [a#5, max(_we0#133) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(cast(b#6 as bigint)) AS sum(b)#117L, sum(cast(b#6 as bigint)) AS _w0#137L] +- Project [a#5, b#6, _we0#133, _we0#133] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#133], [a#5 ASC NULLS FIRST] +- Project [_1#2 AS a#5, _2#3 AS b#6] +- LocalRelation [_1#2, _2#3] == Optimized Logical Plan == Project [a#5, max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(b)#117L, sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L] +- Window [sum(_w0#137L) windowspecdefinition(a#5 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())#118L], [a#5 DESC NULLS LAST] +- Aggregate [a#5], [a#5, max(_we0#133) AS max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))#116, sum(cast(b#6 as bigint)) AS sum(b)#117L, sum(cast(b#6 as bigint)) AS _w0#137L] +- Project [a#5, b#6, _we0#133, _we0#133] +- Window [rank(a#5) windowspecdefinition(a#5 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#133], [a#5 ASC NULLS FIRST] +- LocalRelation [a#5, b#6] +---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+ |a |max(RANK() OVER (ORDER BY a ASC NULLS FIRST unspecifiedframe$()))|sum(b)|sum(sum(b)) OVER (ORDER BY a DESC NULLS LAST unspecifiedframe$())| +---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+ |5 |4 |5 |5 | |2 |3 |4 |9 | |1 |1 |5 |14 | +---+-----------------------------------------------------------------+------+-----------------------------------------------------------------+ ``` **Scenario 4: An aggregate on top of a window expression, which is defined on top of another aggregate** TODO: Not supported now. What is expected? Shall we support this or an exception should be thrown? ``` df.groupBy('a).agg(max(sum(sum('b)).over(window1))).show(false) ``` **Open Questions** 1. Does it make sense to support window expressions inside aggregate functions? Or shall this be placed into a subquery? For example, ``CheckAnalysis`` prohibits aggregates on top of other aggregates. Maybe, it should be similar in this case. 2. Scenarios 2 and 4
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org