[ https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401800#comment-16401800 ]
Dawid Wysakowicz commented on BEAM-3225: ---------------------------------------- Hi, I've tried to reproduce the _"allowed lateness configuration dictates that only non empty panes should be trigger!!!"_ but couldn't do it by any means. Also could not find any bug or issue in flink runner that could be reason for such behaviour. [~pawelbartoszek] are you able to provide some reliable way to reproduce the problem? > Non deterministic behaviour of AfterProcessingTime trigger with multiple > group by transformations > ------------------------------------------------------------------------------------------------- > > Key: BEAM-3225 > URL: https://issues.apache.org/jira/browse/BEAM-3225 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink > Affects Versions: 2.0.0, 2.1.0 > Reporter: Pawel Bartoszek > Assignee: Aljoscha Krettek > Priority: Critical > > *Context* > I run my [test > code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] > against different triggers and runners. My original problem was that when > writing to a file sink files weren't always produced in a deterministic way. > Please refer to this > [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started > looking at WriteFiles class I noticed that file sink implementation includes > some multiple GroupByKey transformations. Knowing that I wrote my test code > that is using multiple GroupByKey transformations to conclude that this is a > bit buggy(?) support of After(Synchronised)ProcessingTime triggers by > GroupByKey that also influence the file sink. When I run my job using > Dataflow runner I was getting expected output. > *About test code* > The job is counting how many A and B elements it received within 30 sec > windows using Count.perElement. Then I am using GroupByKey to fire every time > count has increased. > Below I outlined the expected standard output: > {code:java} > Let's assume all events are received in the same 30 seconds window. > A -> After count KV{A, 1} -> Final group by KV{A, [1]} > A -> After count KV{A, 2} -> Final group by KV{A, [1,2]} > A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]} > B -> After count KV{B, 1} -> Final group by KV{B, [1]} > With my trigger configuration I would expect that for every new element > 'After count' is printed with new value followed by 'Final group by' with new > counter. Final group by represents the history of counters then.{code} > > *Problem* > 'Final group by' trigger doesn't always go off although trigger set up would > suggest that. This behaviour is different for different runners and Beam > versions. > > *My observations when using Pubsub* > Trigger configuration > {code:java} > Window.<String>into(FixedWindows.of(standardSeconds(30))) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > > Beam 2.0 Flink Runner > {code:java} > 2017-11-16T14:51:44.294Z After count KV{A, 1} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.036Z Received Element A > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.143Z After count KV{A, 2} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:52:03.522Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.629Z After count KV{A, 1} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.270Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.372Z After count KV{A, 2} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.394Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.501Z After count KV{A, 3} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.296Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.402Z After count KV{A, 4} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:14.951Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see > 'After count' after this > 2017-11-16T14:52:35.320Z Received Element A > [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z) > 2017-11-16T14:52:35.426Z After count KV{A, 1} > [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z) > 2017-11-16T14:52:35.532Z Final group by KV{A, [1]} > [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z) > {code} > Beam 2.0 Direct Runner > {code:java} > 2017-11-16T14:49:34.135Z Received Element A > [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z) > 2017-11-16T14:49:34.324Z After count KV{A, 1} > [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T14:49:37.526Z Received Element A > [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) > 2017-11-16T14:49:37.535Z After count KV{A, 1} > [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T14:49:44.287Z Received Element A > [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) > 2017-11-16T14:49:44.294Z After count KV{A, 2} > [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) > 2017-11-16T14:49:47.991Z Received Element A > [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T14:49:47.995Z After count KV{A, 3} > [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T14:50:03.323Z Received Element A > [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z) > 2017-11-16T14:50:03.328Z After count KV{A, 1} > [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T14:51:14.309Z Received Element A > [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z) > 2017-11-16T14:51:14.315Z After count KV{A, 1} > [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z) <--- Expected to see > 'Final group by' after this > {code} > Beam 2.1 Flink Runner > {code:java} > 2017-11-16T15:13:02.747Z After count KV{A, 1} > [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z) > 2017-11-16T15:13:02.761Z Final group by KV{A, [1]} > [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z) > 2017-11-16T15:13:09.492Z Received Element A > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) > 2017-11-16T15:13:09.501Z After count KV{A, 1} > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) > 2017-11-16T15:13:09.608Z Final group by KV{A, [1]} > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) > 2017-11-16T15:13:13.029Z Received Element A > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) > 2017-11-16T15:13:13.134Z After count KV{A, 2} > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) > 2017-11-16T15:13:13.240Z Final group by KV{A, [1, 2]} > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) > 2017-11-16T15:13:15.420Z Received Element A > [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) <--- Expected to see > 'After count' after this > 2017-11-16T15:13:38.285Z Received Element A > [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z) > 2017-11-16T15:13:38.379Z After count KV{A, 1} > [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z) > 2017-11-16T15:13:38.481Z Final group by KV{A, [1]} > [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z) > {code} > Beam 2.1 Direct Runner > {code:java} > 2017-11-16T15:17:38.485Z Received Element A > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:38.595Z After count KV{A, 1} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:38.608Z Final group by KV{A, [1]} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:44.977Z Received Element A > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:44.985Z After count KV{A, 2} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:44.988Z Final group by KV{A, [1, 2]} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:51.126Z Received Element A > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) <--- Expected to see > 'After count' after this > 2017-11-16T15:17:57.154Z Received Element A > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:57.154Z After count KV{A, 3} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:57.154Z Received Element A > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:57.158Z After count KV{A, 4} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:57.161Z After count KV{A, 5} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > 2017-11-16T15:17:57.163Z Final group by KV{A, [1, 2, 3, 4, 5]} > [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) > {code} > Trigger configuration > {code:java} > Window.<String>into(FixedWindows.of(standardSeconds(30))) > > .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) > ) > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > Beam 2.0 Flink Runner > {code:java} > 2017-11-16T14:54:14.017Z Received Element A > [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z) > 2017-11-16T14:54:14.187Z After count KV{A, 1} > [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z) > 2017-11-16T14:54:14.205Z Final group by KV{A, [1]} > [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z) > 2017-11-16T14:54:38.499Z Received Element A > [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) > 2017-11-16T14:54:38.604Z After count KV{A, 1} > [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) > 2017-11-16T14:54:38.709Z Final group by KV{A, [1]} > [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) > 2017-11-16T14:54:42.665Z Received Element A > [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) > 2017-11-16T14:54:42.770Z After count KV{A, 2} > [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) <--- Expected to see > 'Final group by' after this (Although I can see the output for next minute > already the final group by trigger is lost for this 30 s windows possibly > forever? > 2017-11-16T14:55:06.131Z Received Element A > [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z) > 2017-11-16T14:55:06.237Z After count KV{A, 1} > [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z) > 2017-11-16T14:55:06.342Z Final group by KV{A, [1]} > [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z) > {code} > Beam 2.1 Flink Runner > {code:java} > 2017-11-16T15:11:09.666Z Received Element A > [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) > 2017-11-16T15:11:09.838Z After count KV{A, 1} > [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) > 2017-11-16T15:11:09.853Z Final group by KV{A, [1]} > [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) > 2017-11-16T15:11:14.208Z Received Element A > [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T15:11:33.216Z Received Element A > [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z) > 2017-11-16T15:11:33.322Z After count KV{A, 1} > [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z) > 2017-11-16T15:11:33.327Z Final group by KV{A, [1]} > [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z) > 2017-11-16T15:11:54.740Z Received Element A > [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z) > 2017-11-16T15:11:54.843Z After count KV{A, 2} > [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z) > 2017-11-16T15:11:54.947Z Final group by KV{A, [1, 2]} > [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z) > {code} > *My observations when using Kinesis Stream* > Trigger configuration > {code:java} > Window.<String>into(FixedWindows.of(standardSeconds(30))) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > Beam 2.1 Direct Runner > {code:java} > 2017-11-16T10:56:33.241Z A > 2017-11-16T10:56:33.460Z After count KV{A, 1} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:56:33.475Z Final group by KV{A, [1]} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:56:37.916Z B > 2017-11-16T10:56:37.950Z After count KV{B, 1} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:56:37.956Z Final group by KV{B, [1]} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:57:05.388Z After count KV{A, 1} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:57:05.388Z After count KV{B, 1} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:57:05.392Z Final group by KV{B, [1, 1]} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > 2017-11-16T10:57:05.392Z Final group by KV{A, [1, 1]} > [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z) > {code} > Beam 2.0 Direct Runner > {code:java} > 2017-11-16T10:55:11.851Z A > 2017-11-16T10:55:11.854Z After count KV{A, 1} > [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) -- > 2017-11-16T10:55:18.329Z B > 2017-11-16T10:55:18.333Z After count KV{B, 1} > [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) > 2017-11-16T10:55:35.195Z After count KV{A, 1} > [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) > 2017-11-16T10:55:35.196Z After count KV{B, 1} > [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) > 2017-11-16T10:55:35.199Z Final group by KV{A, [1, 1]} > [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) > 2017-11-16T10:55:35.199Z Final group by KV{B, [1, 1]} > [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) > {code} > Beam 2.0 Flink Runner > {code:java} > 2017-11-16T11:00:04.820Z A > 2017-11-16T11:00:04.838Z After count KV{A, 1} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:04.943Z Final group by KV{A, [1]} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:10.105Z B > 2017-11-16T11:00:10.138Z After count KV{B, 1} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:30.188Z Final group by KV{B, [1]} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:35.190Z After count KV{A, 1} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:35.191Z After count KV{B, 1} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:35.295Z Final group by KV{A, [1, 1]} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <--- Why Final group by > is triggered only after allowed lateness at 11:00:35? > 2017-11-16T11:00:35.297Z Final group by KV{B, [1, 1]} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > 2017-11-16T11:00:35.298Z Final group by KV{A, []} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <-- allowed lateness > configuration dictates that only non empty panes should be trigger!!! > 2017-11-16T11:00:35.298Z Final group by KV{B, []} > [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) > {code} > Trigger configuration > {code:java} > Window.<String>into(FixedWindows.of(standardSeconds(30))) > > .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()) > ) > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > Beam 2.1 Direct Runner > {code:java} > 2017-11-16T11:14:36.754Z A > 2017-11-16T11:14:36.912Z After count KV{A, 1} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:14:36.925Z Final group by KV{A, [1]} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:14:45.431Z B > 2017-11-16T11:14:45.437Z After count KV{B, 1} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:14:45.440Z Final group by KV{B, [1]} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:15:00.034Z After count KV{A, 1} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:15:00.035Z After count KV{B, 1} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:15:00.039Z Final group by KV{A, [1, 1]} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > 2017-11-16T11:15:00.040Z Final group by KV{B, [1, 1]} > [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z) > {code} > Beam 2.0 Direct Runner > {code:java} > 2017-11-16T11:05:12.562Z A > 2017-11-16T11:05:12.565Z After count KV{A, 1} > [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z) > 2017-11-16T11:05:15.326Z B > 2017-11-16T11:05:15.330Z After count KV{B, 1} > [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z) > 2017-11-16T11:05:30.456Z After count KV{A, 1} > [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z) > 2017-11-16T11:05:30.457Z After count KV{B, 1} > [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z) > 2017-11-16T11:05:30.459Z Final group by KV{A, [1, 1]} > [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z) > 2017-11-16T11:05:30.459Z Final group by KV{B, [1, 1]} > [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z) > {code} > Beam 2.0 Flink Runner - run 1 > {code:java} > 2017-11-16T11:06:32.634Z A > 2017-11-16T11:06:32.797Z After count KV{A, 1} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:06:32.812Z Final group by KV{A, [1]} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:06:36.449Z B > 2017-11-16T11:06:36.550Z After count KV{B, 1} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:06:36.654Z Final group by KV{B, [1]} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:07:00.153Z After count KV{A, 1} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:07:00.155Z After count KV{B, 1} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:07:00.258Z Final group by KV{A, [1, 1]} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:07:00.260Z Final group by KV{B, [1, 1]} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:07:00.262Z Final group by KV{A, [1, 1]} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > 2017-11-16T11:07:00.263Z Final group by KV{B, [1, 1]} > [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z) > {code} > Beam 2.0 Flink Runner - run 2 > {code:java} > 2017-11-16T11:09:41.538Z A > 2017-11-16T11:09:41.618Z After count KV{A, 1} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > 2017-11-16T11:09:41.687Z Final group by KV{A, [1]} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > 2017-11-16T11:09:45.671Z B > 2017-11-16T11:09:45.681Z After count KV{B, 1} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T11:10:00.102Z After count KV{A, 1} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > 2017-11-16T11:10:00.103Z After count KV{B, 1} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > 2017-11-16T11:10:00.201Z Final group by KV{B, [1, 1]} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > 2017-11-16T11:10:00.202Z Final group by KV{A, [1, 1]} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > 2017-11-16T11:10:00.203Z Final group by KV{B, [1, 1]} > [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) > {code} > Beam 2.0 Flink Runner - run 3 > {code:java} > 2017-11-16T11:08:09.474Z A > 2017-11-16T11:08:09.558Z After count KV{A, 1} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) > 2017-11-16T11:08:09.646Z Final group by KV{A, [1]} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) > 2017-11-16T11:08:14.541Z B > 2017-11-16T11:08:30.255Z After count KV{A, 1} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T11:08:30.256Z After count KV{B, 1} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) > 2017-11-16T11:08:30.307Z Final group by KV{A, [1, 1]} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) > 2017-11-16T11:08:30.309Z Final group by KV{A, [1, 1]} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) > 2017-11-16T11:08:30.310Z Final group by KV{B, [1]} > [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) > {code} > Beam 2.0 Flink Runner - run 4 > {code:java} > 2017-11-16T11:07:44.814Z A > 2017-11-16T11:07:44.841Z After count KV{A, 1} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T11:07:48.883Z B > 2017-11-16T11:07:48.901Z After count KV{B, 1} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T11:08:00.099Z After count KV{A, 1} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T11:08:00.101Z After count KV{B, 1} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see > 'Final group by' after this > 2017-11-16T11:08:00.204Z Final group by KV{A, [1, 1]} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) > 2017-11-16T11:08:00.205Z Final group by KV{B, [1, 1]} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) > 2017-11-16T11:08:00.206Z Final group by KV{A, [1, 1]} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) > 2017-11-16T11:08:00.207Z Final group by KV{B, [1, 1]} > [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) > {code} > > *Workaround* > If you check test code I redefined a first trigger just before second group > by key transformations and I was started getting expected result on my local > machine using both direct runner and flink runner. However when I deployed > job to Flink cluster the Final group by trigger didn't go off sometimes. > > *My intuition* > I guess that there is some bug with handling > After(Synchronised)ProcessingTime triggers in Beam. AfterWartermark trigger > always works as expected. It's very interesting that AfterProcessingTime > triggers are going off at different times when comparing Beam 2.0 and 2.1. > I am a bit worried that this bug might be still in Beam 2.2 although not > occurring that frequently. -- This message was sent by Atlassian JIRA (v7.6.3#76005)