Overall I like this proposal; thanks for bringing it forward, Aljoscha. I also like the idea of making the Watermark generator a rich function -- this should make it more straightforward to implement smarter watermark generators. Eg, one that uses state to keep statistics about the actual out-of-orderness, and uses those statistics to implement a variable delay.
David On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Aljoscha, > > Thanks for opening the discussion! > > I have two comments on the FLIP: > 1) we could add lifecycle methods to the Generator, i.e. open()/ > close(), probably with a Context as argument: I have not fully thought > this through but I think that this is more aligned with the rest of > our rich functions. In addition, it will allow, for example, to > initialize the Watermark value, if we decide to checkpoint the > watermark (see [1]) (I also do not know if Table/SQL needs to do > anything in the open()). > 2) aligned with the above, and with the case where we want to > checkpoint the watermark in mind, I am wondering about how we could > implement this in the future. In the FLIP, it is proposed to expose > the WatermarkOutput in the methods of the WatermarkGenerator. Given > that there is the implicit contract that watermarks are > non-decreasing, the WatermarkOutput#emitWatermark() will have (I > assume) a check that will compare the last emitted WM against the > provided one, and emit it only if it is >=. If not, then we risk > having the user shooting himself on the foot if he/she accidentally > forgets the check. Given that the WatermarkGenerator and its caller do > not know if the watermark was finally emitted or not (the > WatermarkOutput#emitWatermark returns void), who will be responsible > for checkpointing the WM? > > Given this, why not having the methods as: > > public interface WatermarkGenerator<T> { > > Watermark onEvent(T event, long eventTimestamp, WatermarkOutput > output); > > Watermark onPeriodicEmit(WatermarkOutput output); > } > > and the caller will be the one enforcing any invariants, such as > non-decreasing watermarks. In this way, the caller can checkpoint > anything that is needed as it will have complete knowledge as to if > the WM was emitted or not. > > What do you think? > > Cheers, > Kostas > > [1] https://issues.apache.org/jira/browse/FLINK-5601 > > On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org> wrote: > > > > Thanks for the proposal Aljoscha. This is a very useful unification. We > > have considered this FLIP already in the interfaces for FLIP-95 [1] and > > look forward to update to the new unified watermark generators once > > FLIP-126 has been accepted. > > > > Regards, > > Timo > > > > [1] https://github.com/apache/flink/pull/11692 > > > > On 20.04.20 18:10, Aljoscha Krettek wrote: > > > Hi Everyone! > > > > > > We would like to start a discussion on "FLIP-126: Unify (and separate) > > > Watermark Assigners" [1]. This work was started by Stephan in an > > > experimental branch. I expanded on that work to provide a PoC for the > > > changes proposed in this FLIP: [2]. > > > > > > Currently, we have two different flavours of Watermark > > > Assigners: AssignerWithPunctuatedWatermarks > > > and AssignerWithPeriodicWatermarks. Both of them extend > > > from TimestampAssigner. This means that sources that want to support > > > watermark assignment/extraction in the source need to support two > > > separate interfaces, we have two operator implementations for the > > > different flavours. Also, this makes features such as generic support > > > for idleness detection more complicated to implemented because we again > > > have to support two types of watermark assigners. > > > > > > In this FLIP we propose two things: > > > > > > Unify the Watermark Assigners into one Interface WatermarkGenerator > > > Separate this new interface from the TimestampAssigner > > > The motivation for the first is to simplify future implementations and > > > code duplication. The motivation for the second point is again code > > > deduplication, most assigners currently have to extend from some base > > > timestamp extractor or duplicate the extraction logic, or users have to > > > override an abstract method of the watermark assigner to provide the > > > timestamp extraction logic. > > > > > > Additionally, we propose to add a generic wrapping WatermarkGenerator > > > that provides idleness detection, i.e. it can mark a stream/partition > as > > > idle if no data arrives after a configured timeout. > > > > > > The "unify and separate" part refers to the fact that we want to unify > > > punctuated and periodic assigners but at the same time split the > > > timestamp assigner from the watermark generator. > > > > > > Please find more details in the FLIP [1]. Looking forward to > > > your feedback. > > > > > > Best, > > > Aljoscha > > > > > > [1] > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners > > > > > > > > > [2] https://github.com/aljoscha/flink/tree/stephan-event-time > > >