Great, thanks a lot for looking into the problem and fixing it. I assume
that your PR will be merged very soon.


On Tue, Jan 14, 2020 at 7:18 PM Benoit Hanotte <> wrote:

> Hello Till,
> thanks for your reply!
> I have been able to debug the issue and reported it in
> It seems the old planner does not add the window specs to the Logical
> nodes' digests, leading the HepPlanner to consider the aggregations to be
> equivalent, when they are not because they use different time windows. I
> explained the issue more in details in the ticket above, and have submitted
> a PR earlier today:
> <>
> [FLINK-15577][table-planner] Fix similar aggregations with different
> windows being considered the same by BenoitHanotte · Pull Request #10854 ·
> apache/flink <>
> What is the purpose of the change The RelNode&#39;s digest is used by the
> Calcite HepPlanner to avoid adding duplicate vertices to the graph. If an
> equivalent vertex was already present in the grap...
> Best,
> Benoit
> ------------------------------
> *From:* Till Rohrmann <>
> *Sent:* Tuesday, January 14, 2020 7:13 PM
> *To:* Benoit Hanotte <>
> *Cc:* <>; Jingsong Li <
>>; <>
> *Subject:* [BULK]Re: Incorrect Physical Plan when unioning two different
> windows, giving incorrect SQL query results
> Hi Benoit,
> thanks for reporting this issue. Since I'm not too familiar with the SQL
> component I've pulled in Timo and Jingsong who know much better what could
> be wrong than I do.
> Cheers,
> Till
> On Mon, Jan 13, 2020 at 11:48 AM Benoit Hanotte <>
> wrote:
> Hello,
> We seem to be facing an issue with Flink where the physical plan after
> planner optimization is not correct.
> I have been able to reproduce the issue in the following "simplified" use
> case (it doesn't seem to happen in trivial cases):
>    1. We open 2 event streams ("clicks" and "displays")
>    2. We compute the click rate (ctr) over 2 hours and 6 hours sliding
>    windows.
>    3. We then union to output one row per hour with the max value between
>    the values computed over 2 and 6hrs.
> You can find SQL query below [1].
> After activating the debug logging for calcite, I can see that the
> original logical plan is valid: the top-level UNION is between two
> LogicalProjects, for the 2hr and 6hrs HOP windows [2].
> However, in the final Physical plan, we can see that both sides of the
> UNION now have 6hrs HOP windows instead of one window over 2hr and one over
> 6hr [3].
> I pushed a commit to my fork to reproduce the issue:
> <>,
> unfortunately simplifying the query seems to make the issue disappear.
> Is there anything obvious I am missing, or do you have any pointer of what
> could trigger this issue? I looked at the different rules applied by the
> planner [4], but, as I am not familiar with them, I haven't yet been able
> to find the root cause.
> Thanks a lot for your help!
> Benoit Hanotte
> ********************************* [1] SQL query
> *********************************
>     WITH displays AS (
>         SELECT `timestamp`, 1 as nb_displays, 0 as nb_clicks FROM
> my_catalog.my_db.display
>     ),
>     clicks AS (
>         SELECT `timestamp`, 0 as nb_displays, 1 as nb_clicks FROM
>     ),
>     counts_2h AS (
>         SELECT
>             SUM(nb_clicks) / SUM(nb_displays) as ctr,
>             HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
> as `timestamp`
>         FROM (
>             (SELECT * FROM displays)
>             UNION ALL
>             (SELECT * FROM clicks)
>         ) t
>         GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR,* INTERVAL '2' HOUR)*
>     ),
>     counts_6h AS (
>         SELECT
>             SUM(nb_clicks) / SUM(nb_displays) as ctr,
>             HOP_ROWTIME(`timestamp`, INTERVAL '1' HOUR, INTERVAL '6' HOUR)
> as `timestamp`
>         FROM (
>             (SELECT * FROM displays)
>             UNION ALL
>             (SELECT * FROM clicks)
>         ) t
>         GROUP BY HOP(`timestamp`, INTERVAL '1' HOUR, *INTERVAL '6' HOUR*)
>     )
>     SELECT
>         TUMBLE_END(`timestamp`, INTERVAL '1' HOUR) as `timestamp`,
>         MAX(ctr)
>     FROM (
>         (SELECT * FROM counts_6h)
>         UNION ALL
>         (SELECT * FROM counts_2h)
>     ) t
>     GROUP BY TUMBLE(`timestamp`, INTERVAL '1' HOUR)
> ********************* [2] Logical plan (before optimization)
> ***********************
>     LogicalProject(timestamp=[TUMBLE_END($0)], EXPR$1=[$1])
>       LogicalAggregate(group=[{0}], EXPR$1=[MAX($1)])
>         LogicalProject($f0=[TUMBLE($1, 3600000:INTERVAL HOUR)], ctr=[$0])
>           LogicalUnion(all=[true])
>             LogicalProject(ctr=[$0], timestamp=[$1])
>               LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
>                 LogicalAggregate(group=[{0}], agg#0=[SUM($1)],
> agg#1=[SUM($2)])
>                   LogicalProject($f0=[*HOP($0, 3600000:INTERVAL HOUR,
> 21600000:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1])
>                     LogicalProject(timestamp=[$0], nb_displays=[0],
> nb_clicks=[1])
>                       LogicalTableScan(table=[[my_catalog, my_db, click]])
>             LogicalProject(ctr=[$0], timestamp=[$1])
>               LogicalProject(ctr=[/($1, $2)], timestamp=[HOP_ROWTIME($0)])
>                 LogicalAggregate(group=[{0}], agg#0=[SUM($1)],
> agg#1=[SUM($2)])
>                   LogicalProject($f0=[*HOP($0, 3600000:INTERVAL HOUR,
> 7200000:INTERVAL HOUR)*], nb_clicks=[$2], nb_displays=[$1])
>                     LogicalProject(timestamp=[$0], nb_displays=[1],
> nb_clicks=[0])
>                       LogicalTableScan(table=[[my_catalog, my_db,
> display]])
> ****************** [3] Resulting physical plan (after optimization)
> ********************
>     DataStreamCalc(select=[w$end AS timestamp, CAST(EXPR$1) AS EXPR$1]):
> rowcount = 400.0, cumulative cost = {3200.0 rows, 3600.0 cpu, 4800.0 io},
> id = 556
>       DataStreamGroupWindowAggregate(window=[TumblingGroupWindow('w$,
> 'timestamp, 3600000.millis)], select=[MAX(ctr) AS EXPR$1, start('w$) AS
> w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
> w$proctime]): rowcount = 400.0, cumulative cost = {2800.0 rows, 3200.0 cpu,
> 4800.0 io}, id = 555
>         DataStreamUnion(all=[true], union all=[ctr, timestamp]): rowcount
> = 400.0, cumulative cost = {2400.0 rows, 2800.0 cpu, 4800.0 io}, id = 554
>           DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime
> AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0
> cpu, 2400.0 io}, id = 548
>             DataStreamGroupWindowAggregate(window=[*SlidingGroupWindow('w$,
> 'timestamp,** 7200000.millis, 3600000.millis**)]*, select=[SUM(nb_clicks)
> AS $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end,
> rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0,
> cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 547
>               DataStreamUnion(all=[true], union all=[timestamp,
> nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows,
> 600.0 cpu, 2400.0 io}, id = 546
>                 DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 544
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
>                 DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 545
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542
>           DataStreamCalc(select=[/(CAST($f0), CAST($f1)) AS ctr, w$rowtime
> AS timestamp]): rowcount = 200.0, cumulative cost = {1000.0 rows, 1200.0
> cpu, 2400.0 io}, id = 553
>             DataStreamGroupWindowAggregate(window=[*SlidingGroupWindow('w$,
> 'timestamp, 7200000.millis, 3600000.millis)*], select=[SUM(nb_clicks) AS
> $f0, SUM(nb_displays) AS $f1, start('w$) AS w$start, end('w$) AS w$end,
> rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]): rowcount = 200.0,
> cumulative cost = {800.0 rows, 800.0 cpu, 2400.0 io}, id = 552
>               DataStreamUnion(all=[true], union all=[timestamp,
> nb_displays, nb_clicks]): rowcount = 200.0, cumulative cost = {600.0 rows,
> 600.0 cpu, 2400.0 io}, id = 551
>                 DataStreamCalc(select=[timestamp, 1 AS nb_displays, 0 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 549
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> display]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 543
>                 DataStreamCalc(select=[timestamp, 0 AS nb_displays, 1 AS
> nb_clicks]): rowcount = 100.0, cumulative cost = {200.0 rows, 200.0 cpu,
> 1200.0 io}, id = 550
>                   StreamTableSourceScan(table=[[my_catalog, my_db,
> click]], fields=[timestamp], source=[$anon$1(...)]): rowcount = 100.0,
> cumulative cost = {100.0 rows, 100.0 cpu, 1200.0 io}, id = 542

Reply via email to