[ https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17796372#comment-17796372 ]
Jeyhun Karimov commented on FLINK-33756: ---------------------------------------- Hi [~jhughes] I had a chance to look at the issue. I share my findings below. So, when we enable {{OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED}}, the following optimized execution plan is produced: {code} Sink(table=[default_catalog.default_database.MySink], fields=[name, $f1, $f2, window_start, window_end]) +- GlobalWindowAggregate(groupBy=[name], window=[CUMULATE(win_end=[$window_end], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, MAX(max$0) AS $f1, $SUM0(sum$1) AS $f2, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[name]]) +- LocalWindowAggregate(groupBy=[name], window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, MAX($f5_0) AS max$0, $SUM0($f6_0) AS sum$1, slice_end('w$) AS $window_end]) +- Calc(select=[name, window_start, window_end, $f5, $f6, $f3 AS $f5_0, $f4 AS $f6_0]) +- GlobalWindowAggregate(groupBy=[name, $f5, $f6], window=[CUMULATE(slice_end=[$slice_end], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, $f5, $f6, MAX(max$0) AS $f3, COUNT(distinct$0 count$1) AS $f4, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[name, $f5, $f6]]) +- LocalWindowAggregate(groupBy=[name, $f5, $f6], window=[CUMULATE(time_col=[rowtime], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, $f5, $f6, MAX(double) FILTER $g_1 AS max$0, COUNT(distinct$0 int) FILTER $g_2 AS count$1, DISTINCT(int) AS distinct$0, slice_end('w$) AS $slice_end]) +- Calc(select=[name, double, int, $f5, $f6, ($e = 1) AS $g_1, ($e = 2) AS $g_2, rowtime]) +- Expand(projects=[{name, double, int, $f5, null AS $f6, 1 AS $e, rowtime}, {name, double, int, null AS $f5, $f6, 2 AS $e, rowtime}]) +- Calc(select=[name, double, int, MOD(HASH_CODE(double), 1024) AS $f5, MOD(HASH_CODE(int), 1024) AS $f6, Reinterpret(TO_TIMESTAMP(ts)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[int, double, name, ts], metadata=[], watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[int, double, name, ts]) {code} As we see, there are two window operators (both with {{Local-Global optimization}} ). (Just to remember that the missing record is - "+I[b, 3.0, 1, 2020-10-10T00:00:31, 2020-10-10T00:00:41]") As we see from the schema of the second {{LocalWindowAggregate}}, it uses {{window_start}} and {{window_end}} to calculate {{CUMULATE}} windows. At this point (at the second {{LocalWindowAggregate}}), our "missing" record becomes like "+I(b,2020-10-10T00:00:31,2020-10-10T00:00:41,0,null,3.0,0)". So, at this point, we already lost the original event time of the record. As a result, the flaky behaviour happens because of the calling order between {{SlicingWindowOperator::processWatermark}}->{{AbstractWindowAggProcessor::advanceProgress}} and {{SlicingWindowOperator::processElement}}: - If the {{processWatermark}} is called before the {{processElement}}, then the {{currentProgress}} is updated to {{1602288041000}}. In this case, once the {{processElement}} is called afterwards, it considers the window is already fired and drops the element - If the {{processElement}} is called before the {{processWatermark}}, then the record processed as expected. Is this something expected? WDYT? > Missing record with CUMULATE/HOP windows using an optimization > -------------------------------------------------------------- > > Key: FLINK-33756 > URL: https://issues.apache.org/jira/browse/FLINK-33756 > Project: Flink > Issue Type: Bug > Reporter: Jim Hughes > Priority: Major > > I have seen an optimization cause a window fail to emit a record. > With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to > true, > the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or > CUMULATE window with an offset, a record can be sent which causes one of the > multiple active windows to fail to emit a record. > The linked code > (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4) > modifies the `WindowAggregateJsonITCase` to demonstrate the case. > > The test `testDistinctSplitDisabled` shows the expected behavior. The test > `testDistinctSplitEnabled` tests the above configurations and shows that one > record is missing from the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)