Hi, This is a question which goes back to the theoretical model. Normally, as defined in the Beam lateness semantics [1], the source is in charge of emitting appropriate timestamps and setting its own watermark. This is in general configurable by users by providing their own timestamp and watermark transformation functions, as implemented in e.g. KafkaIO and PubsubIO. Since the Read transform from that source is a root transform, we do not have an input watermark to worry about in that case, and the output watermark is under the control of the Source.
However, what is the actual and desired behaviour when we wish to materialise timestamp / watermark information in the middle of a pipeline? For example, we have a source which does not support timestamps and sets them all (along with the watermark) to `Long.MIN_VALUE`, or perhaps we need to do some complex processing to determine the correct timestamp. Further, the elements could be out of order with respect to the generated timestamps. I tried analysing the code to work out what the behaviour in the DirectRunner is when using WithTimestamps to assign the timestamps in this case, but I wasn’t able to figure it out. Is the watermark advanced to the latest one of the timestamps assigned so far? If so, if data is out of order is the out-of-order data simply emitted late? (and does the allowed skew need to be in place there to allow that?) The input watermark in this case is always the minimum time. Advancing the watermark in any way past that would break the assumption that output WM <= input WM (as written in [1]). However not advancing the watermark would mean we never fire triggers which depend on it, etc. Am I misunderstanding something? Do the invariants in [1] only hold if we are not actively messing with timestamps/watermarks by using WithTimestamps or similar? I had a look at [2], but this merely seems to be about adjusting the watermark after a “real” watermark is actually established (for example reading log files, etc.) Any clarification would be appreciated. Thanks, Matt [1] https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit# [2] https://issues.apache.org/jira/browse/BEAM-644
