This is very much an issue that comes up with splittable DoFn. With an annotation-driven DoFn API it is possible to detect these boundaries somewhat. A DoFn that uses a ProcessContext is "requesting the capability" to output without specifying the timestamp, and is thus subject to the automated hold output WM <= input WM. Once we provide unbundled methods for just doing output with or without timestamps, we can know when the capability is not used.
On Thu, Apr 6, 2017 at 2:24 PM, Matthew Jadczak <[email protected]> wrote: > Thomas, thanks for that. I had suspected that that may be the case, but > wanted some confirmation. > > It seems to me that theoretically, it makes sense to think of “partitions” > in the watermark space, with sections of the pipeline conforming to the > “output WM <= input WM”, with the breaks between them happening precisely > when timestamps themselves are being materialised or adjusted. > > This makes sense because at those boundaries, the object that is being > timestamped changes—in the log files example, in the first section of the > pipeline, the file itself is being timestamped, but once that file is > transformed into a list of input files, the timestamps (and hence > watermark) of those input files themselves are in a different domain, hence > at that interface it makes sense to break the watermark/timestamp > invariants. > > Thanks, > -- > Matt > > On 6 April 2017 at 19:50:40, Thomas Groh ([email protected]) wrote: > > Hey Matt; > > Generally this is an unsolved problem. We track a related issue in > https://issues.apache.org/jira/browse/BEAM-644, which permits the > watermark > to be shifted backwards in time. That would let a source that does not > support timestamps to emit elements timestamped with "when I read the > element" and emit a watermark that is roughly real-time, and have the > downstream timestamp-assignment hold the watermark as far back as they > think is reasonable. > > However, I don't believe it's required that the output watermark is always > no later than the input watermark. The input Watermark to a transform > represents a promise from the system that "I will never provide this > transform with an input that is timestamped before the watermark"; the > output watermark of a transform represents a promise from the transform to > the system that "I will never produce output that is timestamped before > this timestamp" (modulo late data on both ends, of course). The restriction > that watermarks advance monotonically applies on both ends, but a > PTransform that knows more about the timestamps than an upstream source > could appropriately advance a watermark past that of its input. > > On Wed, Apr 5, 2017 at 11:00 AM, Matthew Jadczak <[email protected]> wrote: > > > Hi, > > > > This is a question which goes back to the theoretical model. Normally, as > > defined in the Beam lateness semantics [1], the source is in charge of > > emitting appropriate timestamps and setting its own watermark. This is in > > general configurable by users by providing their own timestamp and > > watermark transformation functions, as implemented in e.g. KafkaIO and > > PubsubIO. Since the Read transform from that source is a root transform, > we > > do not have an input watermark to worry about in that case, and the > output > > watermark is under the control of the Source. > > > > However, what is the actual and desired behaviour when we wish to > > materialise timestamp / watermark information in the middle of a > pipeline? > > For example, we have a source which does not support timestamps and sets > > them all (along with the watermark) to `Long.MIN_VALUE`, or perhaps we > need > > to do some complex processing to determine the correct timestamp. > Further, > > the elements could be out of order with respect to the generated > timestamps. > > > > I tried analysing the code to work out what the behaviour in the > > DirectRunner is when using WithTimestamps to assign the timestamps in > this > > case, but I wasn’t able to figure it out. Is the watermark advanced to > the > > latest one of the timestamps assigned so far? If so, if data is out of > > order is the out-of-order data simply emitted late? (and does the allowed > > skew need to be in place there to allow that?) > > > > The input watermark in this case is always the minimum time. Advancing > the > > watermark in any way past that would break the assumption that output WM > <= > > input WM (as written in [1]). However not advancing the watermark would > > mean we never fire triggers which depend on it, etc. > > > > Am I misunderstanding something? Do the invariants in [1] only hold if we > > are not actively messing with timestamps/watermarks by using > WithTimestamps > > or similar? > > > > I had a look at [2], but this merely seems to be about adjusting the > > watermark after a “real” watermark is actually established (for example > > reading log files, etc.) > > > > Any clarification would be appreciated. > > > > Thanks, > > Matt > > > > [1] https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_ > > IJeVZn1peOrBrhhP6Y/edit# > > [2] https://issues.apache.org/jira/browse/BEAM-644 >
