[ https://issues.apache.org/jira/browse/FLINK-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014856#comment-17014856 ]
Benoit Hanotte commented on FLINK-15577: ---------------------------------------- Hi [~ykt836], yes, I'll push a PR today > WindowAggregate RelNodes missing Window specs in digest > ------------------------------------------------------- > > Key: FLINK-15577 > URL: https://issues.apache.org/jira/browse/FLINK-15577 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner > Affects Versions: 1.9.1 > Reporter: Benoit Hanotte > Priority: Critical > > The RelNode's digest (AbstractRelNode.getDigest()), along its RowType, is > used by the Calcite HepPlanner to avoid adding duplicate Vertices to the > graph. If an equivalent vertex was already present in the graph, then that > vertex is used in place of the new generated one: > https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828 > This means that *the digest needs to contain all the information necessary to > identify a vertex and distinguish it from similar - but not equivalent - > vertices*. > In the case of `LogicalWindowAggregation` and > `FlinkLogicalWindowAggregation`, the window specs are currently not in the > digest, meaning that two aggregations with the same signatures and > expressions but different windows are considered equivalent by the planner, > which is not correct and will lead to an invalid Physical Plan. > For instance, the following query would give an invalid plan: > {code} > WITH window_1h AS ( > SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as > `timestamp` > FROM my_table > GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) > ), > window_2h AS ( > SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as > `timestamp` > FROM my_table > GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) > ) > (SELECT * FROM window_1h) > UNION ALL > (SELECT * FROM window_2h) > {code} > The invalid plan generated by the planner is the following (*Please note the > windows in the two DataStreamGroupWindowAggregates nodes being the same when > they should be different*): > {code} > DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, > cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176 > DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, > cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173 > DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, > 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, > end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): > rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172 > DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, > cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171 > DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, > cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175 > DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, > 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, > end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): > rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174 > DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, > cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)