xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1467143192
########## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml: ########## @@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) +]]> + </Resource> + </TestCase> + <TestCase name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=ONE_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + a, + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv]) ++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=TWO_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + a, + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv]) ++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSession_OnProctime[aggPhaseEnforcer=TWO_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + a, + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv]) ++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=ONE_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + a, + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv]) ++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=TWO_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + a, + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM TABLE( + SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) +GROUP BY a, window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)]) ++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv]) ++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM ( + SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, a + FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) + WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000' +) +GROUP BY window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)], EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)]) ++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6]) + +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6], e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0]) + +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)]) + +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv]) ++- WindowAggregate(window=[SESSION(win_start=[window_start], win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[single]) + +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])]) + +- Exchange(distribution=[single]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]], fields=[b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + window_start, + window_end, + count(*), + sum(d), + max(d) filter (where b > 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM ( + SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, a + FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) + WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000' +) +GROUP BY window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)], EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)]) ++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6]) + +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6], e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0]) + +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)]) + +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv]) ++- WindowAggregate(window=[SESSION(win_start=[window_start], win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[single]) + +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)]) + +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 min])]) + +- Exchange(distribution=[single]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]], fields=[b, c, d, e, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testSessionWindowTVFWithPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]"> + <Resource name="sql"> + <![CDATA[ +SELECT + window_start, + window_end, + a, + count(*), + sum(d), + max(d) filter (where b > 1000), + weightedAvg(b, e) AS wAvg, + count(distinct c) AS uv +FROM ( + SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, a + FROM TABLE(SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE)) + WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000' +) +GROUP BY a, window_start, window_end + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(window_start=[$1], window_end=[$2], a=[$0], EXPR$3=[$3], EXPR$4=[$4], EXPR$5=[$5], wAvg=[$6], uv=[$7]) ++- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)], EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)]) + +- LogicalProject(a=[$9], window_start=[$0], window_end=[$7], d=[$2], $f4=[IS TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6]) + +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6], e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0]) + +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)]) + +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0), DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[$6]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv]) ++- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start], win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end]) + +- Exchange(distribution=[hash[a]]) Review Comment: Sure. https://issues.apache.org/jira/browse/FLINK-34238 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org