Qingsheng Ren created FLINK-32247:
-------------------------------------
Summary: Normal group by with time attributes after a window group
by is interpreted as GlobalWindowAggregate
Key: FLINK-32247
URL: https://issues.apache.org/jira/browse/FLINK-32247
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2, 1.18.0
Reporter: Qingsheng Ren
Considering a SQL statement below:
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM ( SELECT
`window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS
`price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`),
INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`,
`window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`;
{code}
which should be a group aggregation after a windowed aggregation, but the
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])],
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS
window_end, rowtime('w$) AS window_time])
+- Exchange(distribution=[single])
+- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start],
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0,
slice_end('w$) AS $window_end])
+- Calc(select=[window_start, window_end, window_time])
+- GlobalWindowAggregate(groupBy=[item],
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$)
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+- Exchange(distribution=[hash[item]])
+- LocalWindowAggregate(groupBy=[item],
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$)
AS $slice_end])
+- WatermarkAssigner(rowtime=[rowtime],
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
+- Calc(select=[item, price, TO_TIMESTAMP(ts) AS
rowtime])
+- TableSourceScan(table=[[default_catalog,
default_database, source1]], fields=[id, item, price, ts]) {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)