lincoln lee created FLINK-35816: ----------------------------------- Summary: Non-mergeable proctime tvf window aggregate needs to fallback to group aggregate Key: FLINK-35816 URL: https://issues.apache.org/jira/browse/FLINK-35816 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.1, 1.20.0 Reporter: lincoln lee Assignee: lincoln lee Fix For: 1.20.0
Non-mergeable proctime tvf window aggregate needs to fallback to group aggregate, e.g., an example: {code} select c, count(a) from TABLE(CUMULATE(table MyTable, DESCRIPTOR(proctime), interval '10' seconds, interval '5' minutes)) where window_start <> '123' group by window_start, window_end, c, window_time {code} the window property in above query was materialized before aggregation, so it lost processing time attribute and cause the planner failed to pull up `StreamPhysicalWindowTableFunction` into the `StreamPhysicalWindowAggregate` to generate a valid execution plan, like following(which goes into the attached window strategy which relies on the upstream watermark but lacks of a watermark assigner): {code} Calc(select=[c, EXPR$1]) +- WindowAggregate(groupBy=[c], window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[5 min], step=[10 s])], select=[c, COUNT(a) AS EXPR$1, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[window_start, window_end, c, window_time, a], where=[<>(window_start, '123')]) +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])]) +- Calc(select=[a, c, proctime]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) {code} so, semantically when the window time attribute was materialized after window table function, the downstream aggregation should use group aggregation, the expected plan of the above example can be: {code} Calc(select=[c, EXPR$1]) +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, COUNT(a) AS EXPR$1]) +- Exchange(distribution=[hash[window_start, window_end, c, window_time]]) +- Calc(select=[window_start, window_end, c, window_time, a], where=[<>(window_start, '123')]) +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])]) +- Calc(select=[a, c, proctime]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- Calc(select=[a, c, PROCTIME() AS proctime, rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, c, rowtime], metadata=[]]], fields=[a, c, rowtime]) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)