[ 
https://issues.apache.org/jira/browse/FLINK-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benoit Hanotte updated FLINK-15577:
-----------------------------------
    Description: 
The RelNode's digest (AbstractRelNode.getDigest()), along with its RowType, is 
used by the Calcite HepPlanner to avoid adding duplicate Vertices to the graph. 
If an equivalent vertex is already present in the graph, then that vertex is 
used in place of the newly generated one: 
https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828

This means that *the digest needs to contain all the information necessary to 
identify a vertex and distinguish it from similar - but not equivalent - 
vertices*.

In the case of `LogicalWindowAggregation` and `FlinkLogicalWindowAggregation`, 
the window specs are currently not in the digest, meaning that two aggregations 
with the same signatures and expressions but different windows are considered 
equivalent by the planner, which is not correct and will lead to an invalid 
Physical Plan.

For instance, the following query would give an invalid plan:

{code}
WITH window_1h AS (
    SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as 
`timestamp`
    FROM my_table
    GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
),
window_2h AS (
    SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
`timestamp`
    FROM my_table
    GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
)
(SELECT * FROM window_1h)
UNION ALL
(SELECT * FROM window_2h)
{code}

The invalid plan generated by the planner is the following (*Please note the 
windows in the two DataStreamGroupWindowAggregates nodes being the same when 
they should be different*):

{code}
DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, 
cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
  DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative 
cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
    DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 
7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 
100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
      DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative 
cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
  DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative 
cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
    DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 
7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 
100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
      DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative 
cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
{code}


  was:
The RelNode's digest (AbstractRelNode.getDigest()), along its RowType, is used 
by the Calcite HepPlanner to avoid adding duplicate Vertices to the graph. If 
an equivalent vertex was already present in the graph, then that vertex is used 
in place of the new generated one: 
https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828

This means that *the digest needs to contain all the information necessary to 
identify a vertex and distinguish it from similar - but not equivalent - 
vertices*.

In the case of `LogicalWindowAggregation` and `FlinkLogicalWindowAggregation`, 
the window specs are currently not in the digest, meaning that two aggregations 
with the same signatures and expressions but different windows are considered 
equivalent by the planner, which is not correct and will lead to an invalid 
Physical Plan.

For instance, the following query would give an invalid plan:

{code}
WITH window_1h AS (
    SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as 
`timestamp`
    FROM my_table
    GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
),
window_2h AS (
    SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
`timestamp`
    FROM my_table
    GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
)
(SELECT * FROM window_1h)
UNION ALL
(SELECT * FROM window_2h)
{code}

The invalid plan generated by the planner is the following (*Please note the 
windows in the two DataStreamGroupWindowAggregates nodes being the same when 
they should be different*):

{code}
DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, 
cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
  DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative 
cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
    DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 
7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 
100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
      DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative 
cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
  DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, cumulative 
cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
    DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 'timestamp, 
7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, end('w$) AS 
w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 
100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
      DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, cumulative 
cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
{code}



> WindowAggregate RelNodes missing Window specs in digest
> -------------------------------------------------------
>
>                 Key: FLINK-15577
>                 URL: https://issues.apache.org/jira/browse/FLINK-15577
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Legacy Planner
>    Affects Versions: 1.9.1
>            Reporter: Benoit Hanotte
>            Priority: Critical
>
> The RelNode's digest (AbstractRelNode.getDigest()), along with its RowType, 
> is used by the Calcite HepPlanner to avoid adding duplicate Vertices to the 
> graph. If an equivalent vertex is already present in the graph, then that 
> vertex is used in place of the newly generated one: 
> https://github.com/apache/calcite/blob/branch-1.21/core/src/main/java/org/apache/calcite/plan/hep/HepPlanner.java#L828
> This means that *the digest needs to contain all the information necessary to 
> identify a vertex and distinguish it from similar - but not equivalent - 
> vertices*.
> In the case of `LogicalWindowAggregation` and 
> `FlinkLogicalWindowAggregation`, the window specs are currently not in the 
> digest, meaning that two aggregations with the same signatures and 
> expressions but different windows are considered equivalent by the planner, 
> which is not correct and will lead to an invalid Physical Plan.
> For instance, the following query would give an invalid plan:
> {code}
> WITH window_1h AS (
>     SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) as 
> `timestamp`
>     FROM my_table
>     GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
> ),
> window_2h AS (
>     SELECT HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) as 
> `timestamp`
>     FROM my_table
>     GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> )
> (SELECT * FROM window_1h)
> UNION ALL
> (SELECT * FROM window_2h)
> {code}
> The invalid plan generated by the planner is the following (*Please note the 
> windows in the two DataStreamGroupWindowAggregates nodes being the same when 
> they should be different*):
> {code}
> DataStreamUnion(all=[true], union all=[timestamp]): rowcount = 200.0, 
> cumulative cost = {800.0 rows, 802.0 cpu, 0.0 io}, id = 176
>   DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, 
> cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 173
>     DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
> 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, 
> end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): 
> rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 172
>       DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, 
> cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
>   DataStreamCalc(select=[w$rowtime AS timestamp]): rowcount = 100.0, 
> cumulative cost = {300.0 rows, 301.0 cpu, 0.0 io}, id = 175
>     DataStreamGroupWindowAggregate(window=[SlidingGroupWindow('w$, 
> 'timestamp, 7200000.millis, 3600000.millis)], select=[start('w$) AS w$start, 
> end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): 
> rowcount = 100.0, cumulative cost = {200.0 rows, 201.0 cpu, 0.0 io}, id = 174
>       DataStreamScan(id=[1], fields=[timestamp]): rowcount = 100.0, 
> cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 171
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to