[ 
https://issues.apache.org/jira/browse/FLINK-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014856#comment-17014856
 ] 

Benoit Hanotte commented on FLINK-15577:
----------------------------------------

Hi [~ykt836], yes, I'll push a PR today

> WindowAggregate RelNodes missing Window specs in digest
> -------------------------------------------------------
>
>                 Key: FLINK-15577
>                 URL: https://issues.apache.org/jira/browse/FLINK-15577
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Legacy Planner
>    Affects Versions: 1.9.1
>            Reporter: Benoit Hanotte
>            Priority: Critical
>
> The RelNode's digest (AbstractRelNode.getDigest()), along its RowType, is 
> used by the Calcite HepPlanner to avoid adding duplicate Vertices to the 
> graph. If an equivalent vertex was already present in the graph, then that 
> vertex is used in place of the new generated one: 
> https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828
> This means that *the digest needs to contain all the information necessary to 
> identify a vertex and distinguish it from similar - but not equivalent - 
> vertices*.
> In the case of `LogicalWindowAggregation` and 
> `FlinkLogicalWindowAggregation`, the window specs are currently not in the 
> digest, meaning that two aggregations with the same signatures and 
> expressions but different windows are considered equivalent by the planner, 
> which is not correct and will lead to an invalid Physical Plan.
> For instance, the following query would give an invalid plan:
> {code}
> WITH window_1h AS (
>     SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as 
> `timestamp`
>     FROM my_table
>     GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
> ),
> window_2h AS (
>     SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
> `timestamp`
>     FROM my_table
>     GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> )
> (SELECT * FROM window_1h)
> UNION ALL
> (SELECT * FROM window_2h)
> {code}
> The invalid plan generated by the planner is the following (*Please note the 
> windows in the two DataStreamGroupWindowAggregates nodes being the same when 
> they should be different*):
> {code}
> DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, 
> cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
>   DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, 
> cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
>     DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
> 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, 
> end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): 
> rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
>       DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, 
> cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
>   DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, 
> cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
>     DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
> 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, 
> end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): 
> rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
>       DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, 
> cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to