Good bug catch! Thanks!

I would add that your test reader is not at all guaranteed to work in Beam.
It is only correct if the reader is never restarted from checkpoint.
Otherwise, when it is restarted from checkpoint it will reset
`firstStarted` and the `current` counter.

To be properly correct, you should be remembering all important mutable
fields in the checkpoint mark (thus you cannot use CounterMark) and
restoring them when the checkpoint is not null.

You may get very strange behavior with the current implementation,
depending on the runner.,

On Mon, Nov 7, 2016 at 2:08 PM, Thomas Groh <[email protected]>
wrote:

> You're right in that the Watermark Hold was not being updated in the
> presence of empty updates -
> https://github.com/apache/incubator-beam/pull/1300 fixes that issue. You
> should have seen the same outputs (as your windows are properties of event
> time) but the trigger firing would be delayed.
>
> On Mon, Nov 7, 2016 at 9:20 AM, Demin Alexey <[email protected]> wrote:
>
> > Hi
> >
> > I took CountingInput as example and implement my custom UnboundStream
> with
> > empty time intervals:
> > https://gist.github.com/xhumanoid/813b6a27a8717c4ccb9068c87bf88564
> >
> > behavior very simple:
> > 1) first 8 second emit 1 record/second
> > 2) sleep 30 second
> > 3) continue emit 1 record/second
> >
> > as result I have:
> >
> > 2016-11-07T17:13:20.931Z  KV{3, 1}
> > 2016-11-07T17:13:20.931Z  KV{1, 1}
> > 2016-11-07T17:13:20.931Z  KV{4, 1}
> > 2016-11-07T17:13:20.931Z  KV{2, 1}
> >
> > 2016-11-07T17:13:54.902Z  KV{6, 1}
> > 2016-11-07T17:13:54.902Z  KV{5, 1}
> > 2016-11-07T17:13:54.902Z  KV{7, 1}
> > 2016-11-07T17:13:54.903Z  KV{8, 1}
> >
> > 2016-11-07T17:13:55.898Z  KV{9, 1}
> >
> > 2016-11-07T17:14:00.900Z  KV{11, 1}
> > 2016-11-07T17:14:00.900Z  KV{10, 1}
> > 2016-11-07T17:14:00.900Z  KV{12, 1}
> > 2016-11-07T17:14:00.901Z  KV{13, 1}
> > 2016-11-07T17:14:00.901Z  KV{14, 1}
> >
> > 2016-11-07T17:14:05.900Z  KV{17, 1}
> > 2016-11-07T17:14:05.900Z  KV{18, 1}
> > 2016-11-07T17:14:05.900Z  KV{19, 1}
> > 2016-11-07T17:14:05.900Z  KV{15, 1}
> > 2016-11-07T17:14:05.901Z  KV{16, 1}
> >
> > As you can see: second part of data keep processing until element 9 was
> not
> > emited.
> >
> >
> > 2016-11-07 20:09 GMT+04:00 Raghu Angadi <[email protected]>:
> >
> > > On Mon, Nov 7, 2016 at 12:41 AM, Demin Alexey <[email protected]>
> > wrote:
> > >
> > > > How workaround I tried set withWatermarkFn2 in Instant.now()
> > > >
> > >
> > > This work around should have unblocked DirectRunner as well.
> > >
> > > I haven't looked at UnboundedReadEvaluatorFactory:142.
> > >
> >
>

Reply via email to