Hi,

This is a question which goes back to the theoretical model. Normally, as 
defined in the Beam lateness semantics [1], the source is in charge of emitting 
appropriate timestamps and setting its own watermark. This is in general 
configurable by users by providing their own timestamp and watermark 
transformation functions, as implemented in e.g. KafkaIO and PubsubIO. Since 
the Read transform from that source is a root transform, we do not have an 
input watermark to worry about in that case, and the output watermark is under 
the control of the Source.

However, what is the actual and desired behaviour when we wish to materialise 
timestamp / watermark information in the middle of a pipeline? For example, we 
have a source which does not support timestamps and sets them all (along with 
the watermark) to `Long.MIN_VALUE`, or perhaps we need to do some complex 
processing to determine the correct timestamp. Further, the elements could be 
out of order with respect to the generated timestamps.

I tried analysing the code to work out what the behaviour in the DirectRunner 
is when using WithTimestamps to assign the timestamps in this case, but I 
wasn’t able to figure it out. Is the watermark advanced to the latest one of 
the timestamps assigned so far? If so, if data is out of order is the 
out-of-order data simply emitted late? (and does the allowed skew need to be in 
place there to allow that?)

The input watermark in this case is always the minimum time. Advancing the 
watermark in any way past that would break the assumption that output WM <= 
input WM (as written in [1]). However not advancing the watermark would mean we 
never fire triggers which depend on it, etc.

Am I misunderstanding something? Do the invariants in [1] only hold if we are 
not actively messing with timestamps/watermarks by using WithTimestamps or 
similar?

I had a look at [2], but this merely seems to be about adjusting the watermark 
after a “real” watermark is actually established (for example reading log 
files, etc.)

Any clarification would be appreciated.

Thanks,
Matt

[1] 
https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#
[2] https://issues.apache.org/jira/browse/BEAM-644 

Reply via email to