[jira] [Assigned] (BEAM-3225) Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations
[ https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov reassigned BEAM-3225: -- Assignee: Aljoscha Krettek (was: Eugene Kirpichov) > Non deterministic behaviour of AfterProcessingTime trigger with multiple > group by transformations > - > > Key: BEAM-3225 > URL: https://issues.apache.org/jira/browse/BEAM-3225 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.0.0, 2.1.0 >Reporter: Pawel Bartoszek >Assignee: Aljoscha Krettek >Priority: Critical > > *Context* > I run my [test > code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] > against different triggers and runners. My original problem was that when > writing to a file sink files weren't always produced in a deterministic way. > Please refer to this > [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started > looking at WriteFiles class I noticed that file sink implementation includes > some multiple GroupByKey transformations. Knowing that I wrote my test code > that is using multiple GroupByKey transformations to conclude that this is a > bit buggy(?) support of After(Synchronised)ProcessingTime triggers by > GroupByKey that also influence the file sink. When I run my job using > Dataflow runner I was getting expected output. > *About test code* > The job is counting how many A and B elements it received within 30 sec > windows using Count.perElement. Then I am using GroupByKey to fire every time > count has increased. > Below I outlined the expected standard output: > {code:java} > Let's assume all events are received in the same 30 seconds window. > A -> After count KV{A, 1} -> Final group by KV{A, [1]} > A -> After count KV{A, 2} -> Final group by KV{A, [1,2]} > A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]} > B -> After count KV{B, 1} -> Final group by KV{B, [1]} > With my trigger configuration I would expect that for every new element > 'After count' is printed with new value followed by 'Final group by' with new > counter. Final group by represents the history of counters then.{code} > > *Problem* > 'Final group by' trigger doesn't always go off although trigger set up would > suggest that. This behaviour is different for different runners and Beam > versions. > > *My observations when using Pubsub* > Trigger configuration > {code:java} > Window.into(FixedWindows.of(standardSeconds(30))) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > > Beam 2.0 Flink Runner > {code:java} > 2017-11-16T14:51:44.294Z After count KV{A, 1} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.036Z Received Element A > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.143Z After count KV{A, 2} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:52:03.522Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.629Z After count KV{A, 1} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.270Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.372Z After count KV{A, 2} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.394Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.501Z After count KV{A, 3} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.296Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.402Z After count KV{A, 4} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:14.951Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see > 'After count' after this > 2017-11-16T14:52:35.320Z Received Element A > [2017-11-16T14:52:30
[jira] [Assigned] (BEAM-3225) Non deterministic behaviour of AfterProcessingTime trigger with multiple group by transformations
[ https://issues.apache.org/jira/browse/BEAM-3225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles reassigned BEAM-3225: - Assignee: Eugene Kirpichov (was: Kenneth Knowles) > Non deterministic behaviour of AfterProcessingTime trigger with multiple > group by transformations > - > > Key: BEAM-3225 > URL: https://issues.apache.org/jira/browse/BEAM-3225 > Project: Beam > Issue Type: Bug > Components: runner-core, runner-flink >Affects Versions: 2.0.0, 2.1.0 >Reporter: Pawel Bartoszek >Assignee: Eugene Kirpichov >Priority: Critical > > *Context* > I run my [test > code|https://gist.github.com/pbartoszek/fe6b1d7501333a2a4b385a1424f6bd80] > against different triggers and runners. My original problem was that when > writing to a file sink files weren't always produced in a deterministic way. > Please refer to this > [BEAM-3151|https://issues.apache.org/jira/browse/BEAM-3151] When I started > looking at WriteFiles class I noticed that file sink implementation includes > some multiple GroupByKey transformations. Knowing that I wrote my test code > that is using multiple GroupByKey transformations to conclude that this is a > bit buggy(?) support of After(Synchronised)ProcessingTime triggers by > GroupByKey that also influence the file sink. When I run my job using > Dataflow runner I was getting expected output. > *About test code* > The job is counting how many A and B elements it received within 30 sec > windows using Count.perElement. Then I am using GroupByKey to fire every time > count has increased. > Below I outlined the expected standard output: > {code:java} > Let's assume all events are received in the same 30 seconds window. > A -> After count KV{A, 1} -> Final group by KV{A, [1]} > A -> After count KV{A, 2} -> Final group by KV{A, [1,2]} > A -> After count KV{A, 3} -> Final group by KV{A, [1,2,3]} > B -> After count KV{B, 1} -> Final group by KV{B, [1]} > With my trigger configuration I would expect that for every new element > 'After count' is printed with new value followed by 'Final group by' with new > counter. Final group by represents the history of counters then.{code} > > *Problem* > 'Final group by' trigger doesn't always go off although trigger set up would > suggest that. This behaviour is different for different runners and Beam > versions. > > *My observations when using Pubsub* > Trigger configuration > {code:java} > Window.into(FixedWindows.of(standardSeconds(30))) > > .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > > .withAllowedLateness(standardSeconds(5), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > .accumulatingFiredPanes()) > {code} > > Beam 2.0 Flink Runner > {code:java} > 2017-11-16T14:51:44.294Z After count KV{A, 1} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.036Z Received Element A > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.143Z After count KV{A, 2} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:51:53.246Z Final group by KV{A, [1, 2]} > [2017-11-16T14:51:30.000Z..2017-11-16T14:52:00.000Z) > 2017-11-16T14:52:03.522Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.629Z After count KV{A, 1} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:03.732Z Final group by KV{A, [1]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.270Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.372Z After count KV{A, 2} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:07.476Z Final group by KV{A, [1, 2]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.394Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.501Z After count KV{A, 3} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:10.602Z Final group by KV{A, [1, 2, 3]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.296Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.402Z After count KV{A, 4} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:13.507Z Final group by KV{A, [1, 2, 3, 4]} > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) > 2017-11-16T14:52:14.951Z Received Element A > [2017-11-16T14:52:00.000Z..2017-11-16T14:52:30.000Z) <--- Expected to see > 'After count' after this > 2017-11-16T14:52:35.320Z Received Element A > [2017-11-16T14:52:30.00