Good bug catch! Thanks! I would add that your test reader is not at all guaranteed to work in Beam. It is only correct if the reader is never restarted from checkpoint. Otherwise, when it is restarted from checkpoint it will reset `firstStarted` and the `current` counter.
To be properly correct, you should be remembering all important mutable fields in the checkpoint mark (thus you cannot use CounterMark) and restoring them when the checkpoint is not null. You may get very strange behavior with the current implementation, depending on the runner., On Mon, Nov 7, 2016 at 2:08 PM, Thomas Groh <[email protected]> wrote: > You're right in that the Watermark Hold was not being updated in the > presence of empty updates - > https://github.com/apache/incubator-beam/pull/1300 fixes that issue. You > should have seen the same outputs (as your windows are properties of event > time) but the trigger firing would be delayed. > > On Mon, Nov 7, 2016 at 9:20 AM, Demin Alexey <[email protected]> wrote: > > > Hi > > > > I took CountingInput as example and implement my custom UnboundStream > with > > empty time intervals: > > https://gist.github.com/xhumanoid/813b6a27a8717c4ccb9068c87bf88564 > > > > behavior very simple: > > 1) first 8 second emit 1 record/second > > 2) sleep 30 second > > 3) continue emit 1 record/second > > > > as result I have: > > > > 2016-11-07T17:13:20.931Z KV{3, 1} > > 2016-11-07T17:13:20.931Z KV{1, 1} > > 2016-11-07T17:13:20.931Z KV{4, 1} > > 2016-11-07T17:13:20.931Z KV{2, 1} > > > > 2016-11-07T17:13:54.902Z KV{6, 1} > > 2016-11-07T17:13:54.902Z KV{5, 1} > > 2016-11-07T17:13:54.902Z KV{7, 1} > > 2016-11-07T17:13:54.903Z KV{8, 1} > > > > 2016-11-07T17:13:55.898Z KV{9, 1} > > > > 2016-11-07T17:14:00.900Z KV{11, 1} > > 2016-11-07T17:14:00.900Z KV{10, 1} > > 2016-11-07T17:14:00.900Z KV{12, 1} > > 2016-11-07T17:14:00.901Z KV{13, 1} > > 2016-11-07T17:14:00.901Z KV{14, 1} > > > > 2016-11-07T17:14:05.900Z KV{17, 1} > > 2016-11-07T17:14:05.900Z KV{18, 1} > > 2016-11-07T17:14:05.900Z KV{19, 1} > > 2016-11-07T17:14:05.900Z KV{15, 1} > > 2016-11-07T17:14:05.901Z KV{16, 1} > > > > As you can see: second part of data keep processing until element 9 was > not > > emited. > > > > > > 2016-11-07 20:09 GMT+04:00 Raghu Angadi <[email protected]>: > > > > > On Mon, Nov 7, 2016 at 12:41 AM, Demin Alexey <[email protected]> > > wrote: > > > > > > > How workaround I tried set withWatermarkFn2 in Instant.now() > > > > > > > > > > This work around should have unblocked DirectRunner as well. > > > > > > I haven't looked at UnboundedReadEvaluatorFactory:142. > > > > > >
