This is very much an issue that comes up with splittable DoFn.

With an annotation-driven DoFn API it is possible to detect these
boundaries somewhat. A DoFn that uses a ProcessContext is "requesting the
capability" to output without specifying the timestamp, and is thus subject
to the automated hold output WM <= input WM. Once we provide unbundled
methods for just doing output with or without timestamps, we can know when
the capability is not used.

On Thu, Apr 6, 2017 at 2:24 PM, Matthew Jadczak <[email protected]> wrote:

> Thomas, thanks for that. I had suspected that that may be the case, but
> wanted some confirmation.
>
> It seems to me that theoretically, it makes sense to think of “partitions”
> in the watermark space, with sections of the pipeline conforming to the
> “output WM <= input WM”, with the breaks between them happening precisely
> when timestamps themselves are being materialised or adjusted.
>
> This makes sense because at those boundaries, the object that is being
> timestamped changes—in the log files example, in the first section of the
> pipeline, the file itself is being timestamped, but once that file is
> transformed into a list of input files, the timestamps (and hence
> watermark) of those input files themselves are in a different domain, hence
> at that interface it makes sense to break the watermark/timestamp
> invariants.
>
> Thanks,
> --
> Matt
>
> On 6 April 2017 at 19:50:40, Thomas Groh ([email protected]) wrote:
>
> Hey Matt;
>
> Generally this is an unsolved problem. We track a related issue in
> https://issues.apache.org/jira/browse/BEAM-644, which permits the
> watermark
> to be shifted backwards in time. That would let a source that does not
> support timestamps to emit elements timestamped with "when I read the
> element" and emit a watermark that is roughly real-time, and have the
> downstream timestamp-assignment hold the watermark as far back as they
> think is reasonable.
>
> However, I don't believe it's required that the output watermark is always
> no later than the input watermark. The input Watermark to a transform
> represents a promise from the system that "I will never provide this
> transform with an input that is timestamped before the watermark"; the
> output watermark of a transform represents a promise from the transform to
> the system that "I will never produce output that is timestamped before
> this timestamp" (modulo late data on both ends, of course). The restriction
> that watermarks advance monotonically applies on both ends, but a
> PTransform that knows more about the timestamps than an upstream source
> could appropriately advance a watermark past that of its input.
>
> On Wed, Apr 5, 2017 at 11:00 AM, Matthew Jadczak <[email protected]> wrote:
>
> > Hi,
> >
> > This is a question which goes back to the theoretical model. Normally, as
> > defined in the Beam lateness semantics [1], the source is in charge of
> > emitting appropriate timestamps and setting its own watermark. This is in
> > general configurable by users by providing their own timestamp and
> > watermark transformation functions, as implemented in e.g. KafkaIO and
> > PubsubIO. Since the Read transform from that source is a root transform,
> we
> > do not have an input watermark to worry about in that case, and the
> output
> > watermark is under the control of the Source.
> >
> > However, what is the actual and desired behaviour when we wish to
> > materialise timestamp / watermark information in the middle of a
> pipeline?
> > For example, we have a source which does not support timestamps and sets
> > them all (along with the watermark) to `Long.MIN_VALUE`, or perhaps we
> need
> > to do some complex processing to determine the correct timestamp.
> Further,
> > the elements could be out of order with respect to the generated
> timestamps.
> >
> > I tried analysing the code to work out what the behaviour in the
> > DirectRunner is when using WithTimestamps to assign the timestamps in
> this
> > case, but I wasn’t able to figure it out. Is the watermark advanced to
> the
> > latest one of the timestamps assigned so far? If so, if data is out of
> > order is the out-of-order data simply emitted late? (and does the allowed
> > skew need to be in place there to allow that?)
> >
> > The input watermark in this case is always the minimum time. Advancing
> the
> > watermark in any way past that would break the assumption that output WM
> <=
> > input WM (as written in [1]). However not advancing the watermark would
> > mean we never fire triggers which depend on it, etc.
> >
> > Am I misunderstanding something? Do the invariants in [1] only hold if we
> > are not actively messing with timestamps/watermarks by using
> WithTimestamps
> > or similar?
> >
> > I had a look at [2], but this merely seems to be about adjusting the
> > watermark after a “real” watermark is actually established (for example
> > reading log files, etc.)
> >
> > Any clarification would be appreciated.
> >
> > Thanks,
> > Matt
> >
> > [1] https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_
> > IJeVZn1peOrBrhhP6Y/edit#
> > [2] https://issues.apache.org/jira/browse/BEAM-644
>

Reply via email to