Hey Matt;

Generally this is an unsolved problem. We track a related issue in
https://issues.apache.org/jira/browse/BEAM-644, which permits the watermark
to be shifted backwards in time. That would let a source that does not
support timestamps to emit elements timestamped with "when I read the
element" and emit a watermark that is roughly real-time, and have the
downstream timestamp-assignment hold the watermark as far back as they
think is reasonable.

However, I don't believe it's required that the output watermark is always
no later than the input watermark. The input Watermark to a transform
represents a promise from the system that "I will never provide this
transform with an input that is timestamped before the watermark"; the
output watermark of a transform represents a promise from the transform to
the system that "I will never produce output that is timestamped before
this timestamp" (modulo late data on both ends, of course). The restriction
that watermarks advance monotonically applies on both ends, but a
PTransform that knows more about the timestamps than an upstream source
could appropriately advance a watermark past that of its input.

On Wed, Apr 5, 2017 at 11:00 AM, Matthew Jadczak <[email protected]> wrote:

> Hi,
>
> This is a question which goes back to the theoretical model. Normally, as
> defined in the Beam lateness semantics [1], the source is in charge of
> emitting appropriate timestamps and setting its own watermark. This is in
> general configurable by users by providing their own timestamp and
> watermark transformation functions, as implemented in e.g. KafkaIO and
> PubsubIO. Since the Read transform from that source is a root transform, we
> do not have an input watermark to worry about in that case, and the output
> watermark is under the control of the Source.
>
> However, what is the actual and desired behaviour when we wish to
> materialise timestamp / watermark information in the middle of a pipeline?
> For example, we have a source which does not support timestamps and sets
> them all (along with the watermark) to `Long.MIN_VALUE`, or perhaps we need
> to do some complex processing to determine the correct timestamp. Further,
> the elements could be out of order with respect to the generated timestamps.
>
> I tried analysing the code to work out what the behaviour in the
> DirectRunner is when using WithTimestamps to assign the timestamps in this
> case, but I wasn’t able to figure it out. Is the watermark advanced to the
> latest one of the timestamps assigned so far? If so, if data is out of
> order is the out-of-order data simply emitted late? (and does the allowed
> skew need to be in place there to allow that?)
>
> The input watermark in this case is always the minimum time. Advancing the
> watermark in any way past that would break the assumption that output WM <=
> input WM (as written in [1]). However not advancing the watermark would
> mean we never fire triggers which depend on it, etc.
>
> Am I misunderstanding something? Do the invariants in [1] only hold if we
> are not actively messing with timestamps/watermarks by using WithTimestamps
> or similar?
>
> I had a look at [2], but this merely seems to be about adjusting the
> watermark after a “real” watermark is actually established (for example
> reading log files, etc.)
>
> Any clarification would be appreciated.
>
> Thanks,
> Matt
>
> [1] https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_
> IJeVZn1peOrBrhhP6Y/edit#
> [2] https://issues.apache.org/jira/browse/BEAM-644

Reply via email to