[ 
https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401800#comment-16401800
 ] 

Dawid Wysakowicz commented on BEAM-3225:
----------------------------------------

Hi, I've tried to reproduce the _"allowed lateness configuration dictates that 
only non empty panes should be trigger!!!"_ but couldn't do it by any means. 
Also could not find any bug or issue in flink runner that could be reason for 
such behaviour. [~pawelbartoszek] are you able to provide some reliable way to 
reproduce the problem?

> Non deterministic behaviour of AfterProcessingTime trigger with multiple 
> group by transformations
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3225
>                 URL: https://issues.apache.org/jira/browse/BEAM-3225
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, runner-flink
>    Affects Versions: 2.0.0, 2.1.0
>            Reporter: Pawel Bartoszek
>            Assignee: Aljoscha Krettek
>            Priority: Critical
>
> *Context*
> I run my [test 
> code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] 
> against different triggers and runners. My original problem was that when 
> writing to a file sink files weren't always produced in a deterministic way. 
> Please refer to this 
> [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started 
> looking at WriteFiles class I noticed that file sink implementation includes 
> some multiple GroupByKey transformations. Knowing that I wrote my test code 
> that is using multiple GroupByKey transformations to conclude that this is a 
> bit buggy(?) support of After(Synchronised)ProcessingTime triggers by 
> GroupByKey that also influence the file sink. When I run my job using 
> Dataflow runner I was getting expected output.
> *About test code*
> The job is counting how many A and B elements it received within 30 sec 
> windows using Count.perElement. Then I am using GroupByKey to fire every time 
> count has increased.
> Below I outlined the expected standard output: 
> {code:java}
> Let's assume all events are received in the same 30 seconds window.
> A -> After count KV{A, 1} -> Final group by KV{A, [1]}
> A -> After count KV{A, 2} -> Final group by KV{A, [1,2]}
> A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]}
> B -> After count KV{B, 1} -> Final group by KV{B, [1]}
> With my trigger configuration I would expect that for every new element 
> 'After count' is printed with new value followed by 'Final group by' with new 
> counter. Final group by represents the history of counters then.{code}
>  
> *Problem*
> 'Final group by' trigger doesn't always go off although trigger set up would 
> suggest that. This behaviour is different for different runners and Beam 
> versions. 
>  
> *My observations when using Pubsub*
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
>  
>  Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:51:44.294Z After count KV{A, 1} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.036Z Received Element A 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.143Z After count KV{A, 2} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} 
> [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z)
> 2017-11-16T14:52:03.522Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.629Z After count KV{A, 1} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.270Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.372Z After count KV{A, 2} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.394Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.501Z After count KV{A, 3} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.296Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.402Z After count KV{A, 4} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z)
> 2017-11-16T14:52:14.951Z Received Element A 
> [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see 
> 'After count' after this
> 2017-11-16T14:52:35.320Z Received Element A 
> [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> 2017-11-16T14:52:35.426Z After count KV{A, 1} 
> [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> 2017-11-16T14:52:35.532Z Final group by KV{A, [1]} 
> [2017-11-16T14:52:30.000Z..2017-11-16T14:53:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T14:49:34.135Z Received Element A 
> [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z)
> 2017-11-16T14:49:34.324Z After count KV{A, 1} 
> [2017-11-16T14:49:00.000Z..2017-11-16T14:49:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:49:37.526Z Received Element A 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:37.535Z After count KV{A, 1} 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:49:44.287Z Received Element A 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:44.294Z After count KV{A, 2} 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z)
> 2017-11-16T14:49:47.991Z Received Element A 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:49:47.995Z After count KV{A, 3} 
> [2017-11-16T14:49:30.000Z..2017-11-16T14:50:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:50:03.323Z Received Element A 
> [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z)
> 2017-11-16T14:50:03.328Z After count KV{A, 1} 
> [2017-11-16T14:50:00.000Z..2017-11-16T14:50:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T14:51:14.309Z Received Element A 
> [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z)
> 2017-11-16T14:51:14.315Z After count KV{A, 1} 
> [2017-11-16T14:51:00.000Z..2017-11-16T14:51:30.000Z) <--- Expected to see 
> 'Final group by' after this
> {code}
>  Beam 2.1 Flink Runner
> {code:java}
> 2017-11-16T15:13:02.747Z After count KV{A, 1} 
> [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
> 2017-11-16T15:13:02.761Z Final group by KV{A, [1]} 
> [2017-11-16T15:12:30.000Z..2017-11-16T15:13:00.000Z)
> 2017-11-16T15:13:09.492Z Received Element A 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:09.501Z After count KV{A, 1} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:09.608Z Final group by KV{A, [1]} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.029Z Received Element A 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.134Z After count KV{A, 2} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:13.240Z Final group by KV{A, [1, 2]} 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z)
> 2017-11-16T15:13:15.420Z Received Element A 
> [2017-11-16T15:13:00.000Z..2017-11-16T15:13:30.000Z) <--- Expected to see 
> 'After count' after this
> 2017-11-16T15:13:38.285Z Received Element A 
> [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> 2017-11-16T15:13:38.379Z After count KV{A, 1} 
> [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> 2017-11-16T15:13:38.481Z Final group by KV{A, [1]} 
> [2017-11-16T15:13:30.000Z..2017-11-16T15:14:00.000Z)
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T15:17:38.485Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:38.595Z After count KV{A, 1} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:38.608Z Final group by KV{A, [1]} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.977Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.985Z After count KV{A, 2} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:44.988Z Final group by KV{A, [1, 2]} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:51.126Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z) <--- Expected to see 
> 'After count' after this
> 2017-11-16T15:17:57.154Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.154Z After count KV{A, 3} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.154Z Received Element A 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.158Z After count KV{A, 4} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.161Z After count KV{A, 5} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> 2017-11-16T15:17:57.163Z Final group by KV{A, [1, 2, 3, 4, 5]} 
> [2017-11-16T15:17:30.000Z..2017-11-16T15:18:00.000Z)
> {code}
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> )              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T14:54:14.017Z Received Element A 
> [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:14.187Z After count KV{A, 1} 
> [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:14.205Z Final group by KV{A, [1]} 
> [2017-11-16T14:54:00.000Z..2017-11-16T14:54:30.000Z)
> 2017-11-16T14:54:38.499Z Received Element A 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:38.604Z After count KV{A, 1} 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:38.709Z Final group by KV{A, [1]} 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:42.665Z Received Element A 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z)
> 2017-11-16T14:54:42.770Z After count KV{A, 2} 
> [2017-11-16T14:54:30.000Z..2017-11-16T14:55:00.000Z) <--- Expected to see 
> 'Final group by' after this (Although I can see the output for next minute 
> already the final group by trigger is lost for this 30 s windows possibly 
> forever?
> 2017-11-16T14:55:06.131Z Received Element A 
> [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> 2017-11-16T14:55:06.237Z After count KV{A, 1} 
> [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> 2017-11-16T14:55:06.342Z Final group by KV{A, [1]} 
> [2017-11-16T14:55:00.000Z..2017-11-16T14:55:30.000Z)
> {code}
> Beam 2.1 Flink Runner
> {code:java}
> 2017-11-16T15:11:09.666Z Received Element A 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:09.838Z After count KV{A, 1} 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:09.853Z Final group by KV{A, [1]} 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z)
> 2017-11-16T15:11:14.208Z Received Element A 
> [2017-11-16T15:11:00.000Z..2017-11-16T15:11:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T15:11:33.216Z Received Element A 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:33.322Z After count KV{A, 1} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:33.327Z Final group by KV{A, [1]} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.740Z Received Element A 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.843Z After count KV{A, 2} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> 2017-11-16T15:11:54.947Z Final group by KV{A, [1, 2]} 
> [2017-11-16T15:11:30.000Z..2017-11-16T15:12:00.000Z)
> {code}
> *My observations when using Kinesis Stream*
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) 
>              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T10:56:33.241Z A
> 2017-11-16T10:56:33.460Z After count KV{A, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:33.475Z Final group by KV{A, [1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:37.916Z B
> 2017-11-16T10:56:37.950Z After count KV{B, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:56:37.956Z Final group by KV{B, [1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.388Z After count KV{A, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.388Z After count KV{B, 1} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.392Z Final group by KV{B, [1, 1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> 2017-11-16T10:57:05.392Z Final group by KV{A, [1, 1]} 
> [2017-11-16T10:56:30.000Z..2017-11-16T10:57:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T10:55:11.851Z A
> 2017-11-16T10:55:11.854Z After count KV{A, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z) --
> 2017-11-16T10:55:18.329Z B
> 2017-11-16T10:55:18.333Z After count KV{B, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.195Z After count KV{A, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.196Z After count KV{B, 1} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.199Z Final group by KV{A, [1, 1]} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> 2017-11-16T10:55:35.199Z Final group by KV{B, [1, 1]} 
> [2017-11-16T10:55:00.000Z..2017-11-16T10:55:30.000Z)
> {code}
> Beam 2.0 Flink Runner
> {code:java}
> 2017-11-16T11:00:04.820Z A
> 2017-11-16T11:00:04.838Z After count KV{A, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:04.943Z Final group by KV{A, [1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:10.105Z B
> 2017-11-16T11:00:10.138Z After count KV{B, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:30.188Z Final group by KV{B, [1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.190Z After count KV{A, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.191Z After count KV{B, 1} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.295Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <--- Why Final group by 
> is triggered only after allowed lateness at 11:00:35?
> 2017-11-16T11:00:35.297Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> 2017-11-16T11:00:35.298Z Final group by KV{A, []} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z) <-- allowed lateness 
> configuration dictates that only non empty panes should be trigger!!!
> 2017-11-16T11:00:35.298Z Final group by KV{B, []} 
> [2017-11-16T11:00:00.000Z..2017-11-16T11:00:30.000Z)
> {code}
> Trigger configuration
> {code:java}
> Window.<String>into(FixedWindows.of(standardSeconds(30)))
>                         
> .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
> )              
>                         .withAllowedLateness(standardSeconds(5), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>                         .accumulatingFiredPanes())
> {code}
> Beam 2.1 Direct Runner
> {code:java}
> 2017-11-16T11:14:36.754Z A
> 2017-11-16T11:14:36.912Z After count KV{A, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:36.925Z Final group by KV{A, [1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:45.431Z B
> 2017-11-16T11:14:45.437Z After count KV{B, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:14:45.440Z Final group by KV{B, [1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.034Z After count KV{A, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.035Z After count KV{B, 1} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.039Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> 2017-11-16T11:15:00.040Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:14:30.000Z..2017-11-16T11:15:00.000Z)
> {code}
> Beam 2.0 Direct Runner
> {code:java}
> 2017-11-16T11:05:12.562Z A
> 2017-11-16T11:05:12.565Z After count KV{A, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:15.326Z B
> 2017-11-16T11:05:15.330Z After count KV{B, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.456Z After count KV{A, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.457Z After count KV{B, 1} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.459Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> 2017-11-16T11:05:30.459Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:05:00.000Z..2017-11-16T11:05:30.000Z)
> {code}
> Beam 2.0 Flink Runner - run 1
> {code:java}
> 2017-11-16T11:06:32.634Z A
> 2017-11-16T11:06:32.797Z After count KV{A, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:32.812Z Final group by KV{A, [1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:36.449Z B
> 2017-11-16T11:06:36.550Z After count KV{B, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:06:36.654Z Final group by KV{B, [1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.153Z After count KV{A, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.155Z After count KV{B, 1} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.258Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.260Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.262Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> 2017-11-16T11:07:00.263Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:06:30.000Z..2017-11-16T11:07:00.000Z)
> {code}
> Beam 2.0 Flink Runner - run 2
> {code:java}
> 2017-11-16T11:09:41.538Z A
> 2017-11-16T11:09:41.618Z After count KV{A, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:09:41.687Z Final group by KV{A, [1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:09:45.671Z B
> 2017-11-16T11:09:45.681Z After count KV{B, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:10:00.102Z After count KV{A, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.103Z After count KV{B, 1} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.201Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.202Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> 2017-11-16T11:10:00.203Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:09:30.000Z..2017-11-16T11:10:00.000Z)
> {code}
> Beam 2.0 Flink Runner - run 3
> {code:java}
> 2017-11-16T11:08:09.474Z A
> 2017-11-16T11:08:09.558Z After count KV{A, 1} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:09.646Z Final group by KV{A, [1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:14.541Z B
> 2017-11-16T11:08:30.255Z After count KV{A, 1} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:30.256Z After count KV{B, 1} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.307Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.309Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> 2017-11-16T11:08:30.310Z Final group by KV{B, [1]} 
> [2017-11-16T11:08:00.000Z..2017-11-16T11:08:30.000Z)
> {code}
> Beam 2.0 Flink Runner - run 4
> {code:java}
> 2017-11-16T11:07:44.814Z A
> 2017-11-16T11:07:44.841Z After count KV{A, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:07:48.883Z B
> 2017-11-16T11:07:48.901Z After count KV{B, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:00.099Z After count KV{A, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:00.101Z After count KV{B, 1} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z) <--- Expected to see 
> 'Final group by' after this
> 2017-11-16T11:08:00.204Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.205Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.206Z Final group by KV{A, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> 2017-11-16T11:08:00.207Z Final group by KV{B, [1, 1]} 
> [2017-11-16T11:07:30.000Z..2017-11-16T11:08:00.000Z)
> {code}
>  
>  *Workaround*
> If you check test code I redefined a first trigger just before second group 
> by key transformations and I was started getting expected result on my local 
> machine using both direct runner and flink runner. However when I deployed 
> job to Flink cluster the Final group by trigger didn't go off sometimes.
>  
> *My intuition*
>  I guess that there is some bug with handling 
> After(Synchronised)ProcessingTime triggers in Beam. AfterWartermark trigger 
> always works as expected. It's very interesting that AfterProcessingTime 
> triggers are going off at different times when comparing Beam 2.0 and 2.1.
>  I am a bit worried that this bug might be still in Beam 2.2 although not 
> occurring that frequently.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to