Thanks.

FWIW, I’m building an implementation of the Dataflow/Beam Model in Elixir as my 
final dissertation—this allows me to focus more on theoretical aspects like 
this rather than having to worry about interop and backwards compatibility.

The way I have implemented it now is that a given transform can declare that it 
wants to be able to manage its own OWM. At that point it is able to set a 
“desired output WM” at any point (though monotonicity is enforced on that). 
When it comes time to refresh watermarks, the standard formula as regards the 
input watermark, output watermark and any WM holds applies, except that if the 
transform has opted in to this behaviour, then this “desired OWM” is used in 
place of the input watermark, so that the equation becomes

new_owm = later(current_owm, earlier(desired_owm, owm_hold))

This is so that the transform can still take advantage of holds and triggers in 
the standard way, without having to duplicate that work. This exported 
watermark is actually a “simulated” input watermark.
-- 
Matt

On 6 April 2017 at 22:45:08, Kenneth Knowles ([email protected]) wrote:

This is very much an issue that comes up with splittable DoFn.  

With an annotation-driven DoFn API it is possible to detect these  
boundaries somewhat. A DoFn that uses a ProcessContext is "requesting the  
capability" to output without specifying the timestamp, and is thus subject  
to the automated hold output WM <= input WM. Once we provide unbundled  
methods for just doing output with or without timestamps, we can know when  
the capability is not used.  

On Thu, Apr 6, 2017 at 2:24 PM, Matthew Jadczak <[email protected]> wrote:  

> Thomas, thanks for that. I had suspected that that may be the case, but  
> wanted some confirmation.  
>  
> It seems to me that theoretically, it makes sense to think of “partitions”  
> in the watermark space, with sections of the pipeline conforming to the  
> “output WM <= input WM”, with the breaks between them happening precisely  
> when timestamps themselves are being materialised or adjusted.  
>  
> This makes sense because at those boundaries, the object that is being  
> timestamped changes—in the log files example, in the first section of the  
> pipeline, the file itself is being timestamped, but once that file is  
> transformed into a list of input files, the timestamps (and hence  
> watermark) of those input files themselves are in a different domain, hence  
> at that interface it makes sense to break the watermark/timestamp  
> invariants.  
>  
> Thanks,  
> --  
> Matt  
>  
> On 6 April 2017 at 19:50:40, Thomas Groh ([email protected]) wrote:  
>  
> Hey Matt;  
>  
> Generally this is an unsolved problem. We track a related issue in  
> https://issues.apache.org/jira/browse/BEAM-644, which permits the  
> watermark  
> to be shifted backwards in time. That would let a source that does not  
> support timestamps to emit elements timestamped with "when I read the  
> element" and emit a watermark that is roughly real-time, and have the  
> downstream timestamp-assignment hold the watermark as far back as they  
> think is reasonable.  
>  
> However, I don't believe it's required that the output watermark is always  
> no later than the input watermark. The input Watermark to a transform  
> represents a promise from the system that "I will never provide this  
> transform with an input that is timestamped before the watermark"; the  
> output watermark of a transform represents a promise from the transform to  
> the system that "I will never produce output that is timestamped before  
> this timestamp" (modulo late data on both ends, of course). The restriction  
> that watermarks advance monotonically applies on both ends, but a  
> PTransform that knows more about the timestamps than an upstream source  
> could appropriately advance a watermark past that of its input.  
>  
> On Wed, Apr 5, 2017 at 11:00 AM, Matthew Jadczak <[email protected]> wrote:  
>  
> > Hi,  
> >  
> > This is a question which goes back to the theoretical model. Normally, as  
> > defined in the Beam lateness semantics [1], the source is in charge of  
> > emitting appropriate timestamps and setting its own watermark. This is in  
> > general configurable by users by providing their own timestamp and  
> > watermark transformation functions, as implemented in e.g. KafkaIO and  
> > PubsubIO. Since the Read transform from that source is a root transform,  
> we  
> > do not have an input watermark to worry about in that case, and the  
> output  
> > watermark is under the control of the Source.  
> >  
> > However, what is the actual and desired behaviour when we wish to  
> > materialise timestamp / watermark information in the middle of a  
> pipeline?  
> > For example, we have a source which does not support timestamps and sets  
> > them all (along with the watermark) to `Long.MIN_VALUE`, or perhaps we  
> need  
> > to do some complex processing to determine the correct timestamp.  
> Further,  
> > the elements could be out of order with respect to the generated  
> timestamps.  
> >  
> > I tried analysing the code to work out what the behaviour in the  
> > DirectRunner is when using WithTimestamps to assign the timestamps in  
> this  
> > case, but I wasn’t able to figure it out. Is the watermark advanced to  
> the  
> > latest one of the timestamps assigned so far? If so, if data is out of  
> > order is the out-of-order data simply emitted late? (and does the allowed  
> > skew need to be in place there to allow that?)  
> >  
> > The input watermark in this case is always the minimum time. Advancing  
> the  
> > watermark in any way past that would break the assumption that output WM  
> <=  
> > input WM (as written in [1]). However not advancing the watermark would  
> > mean we never fire triggers which depend on it, etc.  
> >  
> > Am I misunderstanding something? Do the invariants in [1] only hold if we  
> > are not actively messing with timestamps/watermarks by using  
> WithTimestamps  
> > or similar?  
> >  
> > I had a look at [2], but this merely seems to be about adjusting the  
> > watermark after a “real” watermark is actually established (for example  
> > reading log files, etc.)  
> >  
> > Any clarification would be appreciated.  
> >  
> > Thanks,  
> > Matt  
> >  
> > [1] https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_  
> > IJeVZn1peOrBrhhP6Y/edit#  
> > [2] https://issues.apache.org/jira/browse/BEAM-644  
>  

Reply via email to