[ https://issues.apache.org/jira/browse/FLINK-34238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17811180#comment-17811180 ]
Benchao Li commented on FLINK-34238: ------------------------------------ Make sense to me. Another general way is to let {{FlinkExpandConversionRule}} to remove shuffle via distribution trait, which is now used only in batch. (I know it needs a lot of efforts, since many rules in streaming need to adapt, we've done this work internally, hopefully someone could contribute this back) > In streaming mode, redundant exchange nodes can be optimally deleted in some > cases > ---------------------------------------------------------------------------------- > > Key: FLINK-34238 > URL: https://issues.apache.org/jira/browse/FLINK-34238 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner > Reporter: xuyang > Priority: Minor > > Take the following plan as an example: > {code:java} > Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv]) > +- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start], > win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) > AS EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) > AS wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS > window_end]) > +- Exchange(distribution=[hash[a]]) > +- Calc(select=[a, window_start, window_end, d, IS TRUE(>(b, 1000)) AS > $f4, b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)]) > +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5 > min], partition keys=[a])]) > +- Exchange(distribution=[hash[a]]) > +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, > 1000:INTERVAL SECOND)]) > +- TableSourceScan(table=[[default_catalog, > default_database, MyTable]], fields=[a, b, c, d, e, rowtime]) {code} > If the node `WindowTableFunction`, `Calc` and `WindowAggregate` can be > chained finally, theĀ `Exchange` between `Calc` and `WindowAggregate` can be > removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)