[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization
[ https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796949#comment-17796949 ] Jim Hughes commented on FLINK-33756: Hi [~jeyhunkarimov], I am not actively working on it; I'll assign it to you for now. I think there are two (likely related) things going on: First, some watermark is getting miscomputed, and second, the hashing happening in the exchange step is allowing things to happen in either order. >From the initial time that I looked into this, I also ran across >`TimeWindow.getWindowStartWithOffset`. I noticed that this method is being >called with an offset of 0L in `TimeWindowUtil.getNextTriggerWatermark`. I >cannot be 100% sure that's the problem, but that's the next place I'd be >checking if I were to continue looking! > Missing record with CUMULATE/HOP windows using an optimization > -- > > Key: FLINK-33756 > URL: https://issues.apache.org/jira/browse/FLINK-33756 > Project: Flink > Issue Type: Bug >Reporter: Jim Hughes >Priority: Major > > I have seen an optimization cause a window fail to emit a record. > With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to > true, > the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or > CUMULATE window with an offset, a record can be sent which causes one of the > multiple active windows to fail to emit a record. > The linked code > (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4) > modifies the `WindowAggregateJsonITCase` to demonstrate the case. > > The test `testDistinctSplitDisabled` shows the expected behavior. The test > `testDistinctSplitEnabled` tests the above configurations and shows that one > record is missing from the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization
[ https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796939#comment-17796939 ] Jeyhun Karimov commented on FLINK-33756: [~jhughes] Yes it is just printing/logging on various places of the codebase. If you are not working on this issue (or if it is not sth urgent), you can assign it to me, will try to come up with deterministic solution to avoid the flakiness. > Missing record with CUMULATE/HOP windows using an optimization > -- > > Key: FLINK-33756 > URL: https://issues.apache.org/jira/browse/FLINK-33756 > Project: Flink > Issue Type: Bug >Reporter: Jim Hughes >Priority: Major > > I have seen an optimization cause a window fail to emit a record. > With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to > true, > the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or > CUMULATE window with an offset, a record can be sent which causes one of the > multiple active windows to fail to emit a record. > The linked code > (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4) > modifies the `WindowAggregateJsonITCase` to demonstrate the case. > > The test `testDistinctSplitDisabled` shows the expected behavior. The test > `testDistinctSplitEnabled` tests the above configurations and shows that one > record is missing from the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization
[ https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796533#comment-17796533 ] Jim Hughes commented on FLINK-33756: Hi [~jeyhunkarimov], nice analysis! I did see that there were two pairs of Local-Global window aggregates when I very briefly looked initially; I totally agree that has to be part of the issue. Out of curiosity, how did you see the value coming out of the various windows? Was it println debugging or something else? I like your explanation about the order of `processWatermark` and `processElement`; that explains the apparent flakiness. Looks like the different orderings is coming from the exchanging / hashing which is happening between the windows. Perhaps thinking about how timestamps and the exchange operator will help us sort this out. (Along with your note that we are "losing" the original timestamp in some sense.) > Missing record with CUMULATE/HOP windows using an optimization > -- > > Key: FLINK-33756 > URL: https://issues.apache.org/jira/browse/FLINK-33756 > Project: Flink > Issue Type: Bug >Reporter: Jim Hughes >Priority: Major > > I have seen an optimization cause a window fail to emit a record. > With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to > true, > the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or > CUMULATE window with an offset, a record can be sent which causes one of the > multiple active windows to fail to emit a record. > The linked code > (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4) > modifies the `WindowAggregateJsonITCase` to demonstrate the case. > > The test `testDistinctSplitDisabled` shows the expected behavior. The test > `testDistinctSplitEnabled` tests the above configurations and shows that one > record is missing from the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33756) Missing record with CUMULATE/HOP windows using an optimization
[ https://issues.apache.org/jira/browse/FLINK-33756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796372#comment-17796372 ] Jeyhun Karimov commented on FLINK-33756: Hi [~jhughes] I had a chance to look at the issue. I share my findings below. So, when we enable {{OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED}}, the following optimized execution plan is produced: {code} Sink(table=[default_catalog.default_database.MySink], fields=[name, $f1, $f2, window_start, window_end]) +- GlobalWindowAggregate(groupBy=[name], window=[CUMULATE(win_end=[$window_end], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, MAX(max$0) AS $f1, $SUM0(sum$1) AS $f2, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[name]]) +- LocalWindowAggregate(groupBy=[name], window=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, MAX($f5_0) AS max$0, $SUM0($f6_0) AS sum$1, slice_end('w$) AS $window_end]) +- Calc(select=[name, window_start, window_end, $f5, $f6, $f3 AS $f5_0, $f4 AS $f6_0]) +- GlobalWindowAggregate(groupBy=[name, $f5, $f6], window=[CUMULATE(slice_end=[$slice_end], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, $f5, $f6, MAX(max$0) AS $f3, COUNT(distinct$0 count$1) AS $f4, start('w$) AS window_start, end('w$) AS window_end]) +- Exchange(distribution=[hash[name, $f5, $f6]]) +- LocalWindowAggregate(groupBy=[name, $f5, $f6], window=[CUMULATE(time_col=[rowtime], max_size=[15 s], step=[5 s], offset=[1 s])], select=[name, $f5, $f6, MAX(double) FILTER $g_1 AS max$0, COUNT(distinct$0 int) FILTER $g_2 AS count$1, DISTINCT(int) AS distinct$0, slice_end('w$) AS $slice_end]) +- Calc(select=[name, double, int, $f5, $f6, ($e = 1) AS $g_1, ($e = 2) AS $g_2, rowtime]) +- Expand(projects=[{name, double, int, $f5, null AS $f6, 1 AS $e, rowtime}, {name, double, int, null AS $f5, $f6, 2 AS $e, rowtime}]) +- Calc(select=[name, double, int, MOD(HASH_CODE(double), 1024) AS $f5, MOD(HASH_CODE(int), 1024) AS $f6, Reinterpret(TO_TIMESTAMP(ts)) AS rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[int, double, name, ts], metadata=[], watermark=[-(TO_TIMESTAMP(ts), 1000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[int, double, name, ts]) {code} As we see, there are two window operators (both with {{Local-Global optimization}} ). (Just to remember that the missing record is - "+I[b, 3.0, 1, 2020-10-10T00:00:31, 2020-10-10T00:00:41]") As we see from the schema of the second {{LocalWindowAggregate}}, it uses {{window_start}} and {{window_end}} to calculate {{CUMULATE}} windows. At this point (at the second {{LocalWindowAggregate}}), our "missing" record becomes like "+I(b,2020-10-10T00:00:31,2020-10-10T00:00:41,0,null,3.0,0)". So, at this point, we already lost the original event time of the record. As a result, the flaky behaviour happens because of the calling order between {{SlicingWindowOperator::processWatermark}}->{{AbstractWindowAggProcessor::advanceProgress}} and {{SlicingWindowOperator::processElement}}: - If the {{processWatermark}} is called before the {{processElement}}, then the {{currentProgress}} is updated to {{1602288041000}}. In this case, once the {{processElement}} is called afterwards, it considers the window is already fired and drops the element - If the {{processElement}} is called before the {{processWatermark}}, then the record processed as expected. Is this something expected? WDYT? > Missing record with CUMULATE/HOP windows using an optimization > -- > > Key: FLINK-33756 > URL: https://issues.apache.org/jira/browse/FLINK-33756 > Project: Flink > Issue Type: Bug >Reporter: Jim Hughes >Priority: Major > > I have seen an optimization cause a window fail to emit a record. > With the optimization `TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED` set to > true, > the configuration AggregatePhaseStrategy.TWO_PHASE set, using a HOP or > CUMULATE window with an offset, a record can be sent which causes one of the > multiple active windows to fail to emit a record. > The linked code > (https://github.com/jnh5y/flink/commit/ec90aa501d86f95559f8b22b0610e9fb786f05d4) > modifies the `WindowAggregateJsonITCase` to demonstrate the case. > > The test `testDistinctSplitDisabled` shows the expected behavior. The test > `testDistinctSplitEnabled` tests the above configurations and shows that one > record is missing from the output. -- This message was sent by Atlassian Jira (v8.20.10#820010)