[ 
https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349669#comment-16349669
 ] 

Eugene Kirpichov commented on BEAM-3225:
----------------------------------------

Thanks for the thorough investigation!

Another comment here: triggers *are* non-deterministic, in the sense that they 
only *unblock* output from being produced, but don't cause it to be immediately 
produced. Of course, runners generally try to produce it quickly, but e.g. with 
an AfterPane.elementCountAtLeast(1) a runner is definitely allowed to process 
several elements and then emit a single pane containing all of them, not 
necessarily firing 1 pane for element (hence "element count at least 1" rather 
than "exactly 1"). With something like AfterProcessingTime it is still more 
vague, as clocks are approximate - by the time the trigger fires, more elements 
could have arrived, so we can emit all of them.

To do what you want (full exact history of count changes) you'll need to write 
a stateful DoFn with a per-key counter and manually emit everything you want. 
That is what Beam does under the hood anyway (triggers are implemented using 
per-key state and timers), just not with the semantics you want.

That explains all of your examples up until "Why Final group by is triggered 
only after allowed lateness at 11:00:35?". As for that one, I'm not sure I 
understand what is the issue? After Count A fires after the first time A 
arrives (causing Final group by to fire), and fires another time when the 
window closes, i.e. after the allowed lateness (causing Final group by to fire 
one more time) - seems as expected?

As for "allowed lateness configuration dictates that only non empty panes 
should be trigger!!!" - yes, this seems like a bug in the Flink runner; 
[~aljoscha] could you take a look?

> Non deterministic behaviour of AfterProcessingTime trigger with multiple 
> group by transformations
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3225
>                 URL: https://issues.apache.org/jira/browse/BEAM-3225
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-flink
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Pawel Bartoszek
>            Assignee: Eugene Kirpichov
>            Priority: Critical
>
> *Context*
> I run my [test 
> code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] 
> against different triggers and runners. My original problem was that when 
> writing to a file sink files weren't always produced in a deterministic way. 
> Please refer to this 
> [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started 
> looking at WriteFiles class I noticed that file sink implementation includes 
> some multiple GroupByKey transformations. Knowing that I wrote my test code 
> that is using multiple GroupByKey transformations to conclude that this is a 
> bit buggy(?) support of After(Synchronised)ProcessingTime triggers by 
> GroupByKey that also influence the file sink. When I run my job using 
> Dataflow runner I was getting expected output.
> *About test code*
> The job is counting how many A and B elements it received within 30 sec 
> windows using Count.perElement. Then I am using GroupByKey to fire every time 
> count has increased.
> Below I outlined the expected standard output: 
> {code:java}
> Let's assume all events are received in the same 30 seconds window.
> A -> After count KV{A, 1} -> Final group by KV{A, [1]}
> A -> After count KV{A, 2} -> Final group by KV{A, [1,2]}
> A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]}
> B -> After count KV{B, 1} -> Final group by KV{B, [1]}
> With my trigger configuration I would expect that for every new element 
> 'After count' is printed with new value followed by 'Final group by' with new 
> counter. Final group by represents the history of counters then.{code}
>  
> *Problem*
> 'Final group by' trigger doesn't always go off although trigger set up would 
> suggest that. This behaviour is different for different runners and Beam 
> versions. 
>  
> *My observations when using Pubsub*
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
>  
>  Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:51:44.294Z After count KV{A, 1} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.036Z Received Element A 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.143Z After count KV{A, 2} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:52:03.522Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.629Z After count KV{A, 1} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.270Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.372Z After count KV{A, 2} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.394Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.501Z After count KV{A, 3} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.296Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.402Z After count KV{A, 4} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:14.951Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see 
> 'After count' after this
> 2017-11-16T14:52:35.320Z Received Element A 
> [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> 2017-11-16T14:52:35.426Z After count KV{A, 1} 
> [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> 2017-11-16T14:52:35.532Z Final group by KV{A, [1]} 
> [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T14:49:34.135Z Received Element A 
> [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z)
> 2017-11-16T14:49:34.324Z After count KV{A, 1} 
> [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:49:37.526Z Received Element A 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:37.535Z After count KV{A, 1} 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:49:44.287Z Received Element A 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:44.294Z After count KV{A, 2} 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:47.991Z Received Element A 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:49:47.995Z After count KV{A, 3} 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:50:03.323Z Received Element A 
> [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z)
> 2017-11-16T14:50:03.328Z After count KV{A, 1} 
> [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:51:14.309Z Received Element A 
> [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z)
> 2017-11-16T14:51:14.315Z After count KV{A, 1} 
> [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z) <--- Expected to see 
> 'Final group by' after this
> {code}
>  Beam 2.1 Flink Runner
> {code:java}
> 2017-11-16T15:13:02.747Z After count KV{A, 1} 
> [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
> 2017-11-16T15:13:02.761Z Final group by KV{A, [1]} 
> [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
> 2017-11-16T15:13:09.492Z Received Element A 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:09.501Z After count KV{A, 1} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:09.608Z Final group by KV{A, [1]} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.029Z Received Element A 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.134Z After count KV{A, 2} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.240Z Final group by KV{A, [1, 2]} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:15.420Z Received Element A 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) <--- Expected to see 
> 'After count' after this
> 2017-11-16T15:13:38.285Z Received Element A 
> [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> 2017-11-16T15:13:38.379Z After count KV{A, 1} 
> [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> 2017-11-16T15:13:38.481Z Final group by KV{A, [1]} 
> [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T15:17:38.485Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:38.595Z After count KV{A, 1} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:38.608Z Final group by KV{A, [1]} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.977Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.985Z After count KV{A, 2} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.988Z Final group by KV{A, [1, 2]} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:51.126Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) <--- Expected to see 
> 'After count' after this
> 2017-11-16T15:17:57.154Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.154Z After count KV{A, 3} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.154Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.158Z After count KV{A, 4} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.161Z After count KV{A, 5} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.163Z Final group by KV{A, [1, 2, 3, 4, 5]} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> {code}
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> )              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:54:14.017Z Received Element A 
> [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:14.187Z After count KV{A, 1} 
> [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:14.205Z Final group by KV{A, [1]} 
> [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:38.499Z Received Element A 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:38.604Z After count KV{A, 1} 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:38.709Z Final group by KV{A, [1]} 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:42.665Z Received Element A 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:42.770Z After count KV{A, 2} 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) <--- Expected to see 
> 'Final group by' after this (Although I can see the output for next minute 
> already the final group by trigger is lost for this 30 s windows possibly 
> forever?
> 2017-11-16T14:55:06.131Z Received Element A 
> [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> 2017-11-16T14:55:06.237Z After count KV{A, 1} 
> [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> 2017-11-16T14:55:06.342Z Final group by KV{A, [1]} 
> [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> {code}
> Beam 2.1 Flink Runner
> {code:java}
> 2017-11-16T15:11:09.666Z Received Element A 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:09.838Z After count KV{A, 1} 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:09.853Z Final group by KV{A, [1]} 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:14.208Z Received Element A 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T15:11:33.216Z Received Element A 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:33.322Z After count KV{A, 1} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:33.327Z Final group by KV{A, [1]} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.740Z Received Element A 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.843Z After count KV{A, 2} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.947Z Final group by KV{A, [1, 2]} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> {code}
> *My observations when using Kinesis Stream*
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T10:56:33.241Z A
> 2017-11-16T10:56:33.460Z After count KV{A, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:33.475Z Final group by KV{A, [1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:37.916Z B
> 2017-11-16T10:56:37.950Z After count KV{B, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:37.956Z Final group by KV{B, [1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.388Z After count KV{A, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.388Z After count KV{B, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.392Z Final group by KV{B, [1, 1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.392Z Final group by KV{A, [1, 1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T10:55:11.851Z A
> 2017-11-16T10:55:11.854Z After count KV{A, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) --
> 2017-11-16T10:55:18.329Z B
> 2017-11-16T10:55:18.333Z After count KV{B, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.195Z After count KV{A, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.196Z After count KV{B, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.199Z Final group by KV{A, [1, 1]} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.199Z Final group by KV{B, [1, 1]} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> {code}
> Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T11:00:04.820Z A
> 2017-11-16T11:00:04.838Z After count KV{A, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:04.943Z Final group by KV{A, [1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:10.105Z B
> 2017-11-16T11:00:10.138Z After count KV{B, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:30.188Z Final group by KV{B, [1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.190Z After count KV{A, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.191Z After count KV{B, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.295Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <--- Why Final group by 
> is triggered only after allowed lateness at 11:00:35?
> 2017-11-16T11:00:35.297Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.298Z Final group by KV{A, []} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <-- allowed lateness 
> configuration dictates that only non empty panes should be trigger!!!
> 2017-11-16T11:00:35.298Z Final group by KV{B, []} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> {code}
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> )              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T11:14:36.754Z A
> 2017-11-16T11:14:36.912Z After count KV{A, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:36.925Z Final group by KV{A, [1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:45.431Z B
> 2017-11-16T11:14:45.437Z After count KV{B, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:45.440Z Final group by KV{B, [1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.034Z After count KV{A, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.035Z After count KV{B, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.039Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.040Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T11:05:12.562Z A
> 2017-11-16T11:05:12.565Z After count KV{A, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:15.326Z B
> 2017-11-16T11:05:15.330Z After count KV{B, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.456Z After count KV{A, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.457Z After count KV{B, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.459Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.459Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> {code}
> Beam 2.0 Flink Runner - run 1
> {code:java}
> 2017-11-16T11:06:32.634Z A
> 2017-11-16T11:06:32.797Z After count KV{A, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:32.812Z Final group by KV{A, [1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:36.449Z B
> 2017-11-16T11:06:36.550Z After count KV{B, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:36.654Z Final group by KV{B, [1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.153Z After count KV{A, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.155Z After count KV{B, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.258Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.260Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.262Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.263Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> {code}
> Beam 2.0 Flink Runner - run 2
> {code:java}
> 2017-11-16T11:09:41.538Z A
> 2017-11-16T11:09:41.618Z After count KV{A, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:09:41.687Z Final group by KV{A, [1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:09:45.671Z B
> 2017-11-16T11:09:45.681Z After count KV{B, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:10:00.102Z After count KV{A, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.103Z After count KV{B, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.201Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.202Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.203Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> {code}
> Beam 2.0 Flink Runner - run 3
> {code:java}
> 2017-11-16T11:08:09.474Z A
> 2017-11-16T11:08:09.558Z After count KV{A, 1} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:09.646Z Final group by KV{A, [1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:14.541Z B
> 2017-11-16T11:08:30.255Z After count KV{A, 1} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:30.256Z After count KV{B, 1} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.307Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.309Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.310Z Final group by KV{B, [1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> {code}
> Beam 2.0 Flink Runner - run 4
> {code:java}
> 2017-11-16T11:07:44.814Z A
> 2017-11-16T11:07:44.841Z After count KV{A, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:07:48.883Z B
> 2017-11-16T11:07:48.901Z After count KV{B, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:00.099Z After count KV{A, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:00.101Z After count KV{B, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:00.204Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.205Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.206Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.207Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> {code}
>  
>  *Workaround*
> If you check test code I redefined a first trigger just before second group 
> by key transformations and I was started getting expected result on my local 
> machine using both direct runner and flink runner. However when I deployed 
> job to Flink cluster the Final group by trigger didn't go off sometimes.
>  
> *My intuition*
>  I guess that there is some bug with handling 
> After(Synchronised)ProcessingTime triggers in Beam. AfterWartermark trigger 
> always works as expected. It's very interesting that AfterProcessingTime 
> triggers are going off at different times when comparing Beam 2.0 and 2.1.
>  I am a bit worried that this bug might be still in Beam 2.2 although not 
> occurring that frequently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to