Overall I like this proposal; thanks for bringing it forward, Aljoscha.

I also like the idea of making the Watermark generator a rich function --
this should make it more straightforward to implement smarter watermark
generators. Eg, one that uses state to keep statistics about the actual
out-of-orderness, and uses those statistics to implement a variable delay.

David

On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Aljoscha,
>
> Thanks for opening the discussion!
>
> I have two comments on the FLIP:
> 1) we could add lifecycle methods to the Generator, i.e. open()/
> close(), probably with a Context as argument: I have not fully thought
> this through but I think that this is more aligned with the rest of
> our rich functions. In addition, it will allow, for example, to
> initialize the Watermark value, if we decide to checkpoint the
> watermark (see [1]) (I also do not know if Table/SQL needs to do
> anything in the open()).
> 2) aligned with the above, and with the case where we want to
> checkpoint the watermark in mind, I am wondering about how we could
> implement this in the future. In the FLIP, it is proposed to expose
> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> that there is the implicit contract that watermarks are
> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> assume) a check that will compare the last emitted WM against the
> provided one, and emit it only if it is >=. If not, then we risk
> having the user shooting himself on the foot if he/she accidentally
> forgets the check. Given that the WatermarkGenerator and its caller do
> not know if the watermark was finally emitted or not (the
> WatermarkOutput#emitWatermark returns void), who will be responsible
> for checkpointing the WM?
>
> Given this, why not having the methods as:
>
> public interface WatermarkGenerator<T> {
>
>     Watermark onEvent(T event, long eventTimestamp, WatermarkOutput
> output);
>
>     Watermark onPeriodicEmit(WatermarkOutput output);
> }
>
> and the caller will be the one enforcing any invariants, such as
> non-decreasing watermarks. In this way, the caller can checkpoint
> anything that is needed as it will have complete knowledge as to if
> the WM was emitted or not.
>
> What do you think?
>
> Cheers,
> Kostas
>
> [1] https://issues.apache.org/jira/browse/FLINK-5601
>
> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org> wrote:
> >
> > Thanks for the proposal Aljoscha. This is a very useful unification. We
> > have considered this FLIP already in the interfaces for FLIP-95 [1] and
> > look forward to update to the new unified watermark generators once
> > FLIP-126 has been accepted.
> >
> > Regards,
> > Timo
> >
> > [1] https://github.com/apache/flink/pull/11692
> >
> > On 20.04.20 18:10, Aljoscha Krettek wrote:
> > > Hi Everyone!
> > >
> > > We would like to start a discussion on "FLIP-126: Unify (and separate)
> > > Watermark Assigners" [1]. This work was started by Stephan in an
> > > experimental branch. I expanded on that work to provide a PoC for the
> > > changes proposed in this FLIP: [2].
> > >
> > > Currently, we have two different flavours of Watermark
> > > Assigners: AssignerWithPunctuatedWatermarks
> > > and AssignerWithPeriodicWatermarks. Both of them extend
> > > from TimestampAssigner. This means that sources that want to support
> > > watermark assignment/extraction in the source need to support two
> > > separate interfaces, we have two operator implementations for the
> > > different flavours. Also, this makes features such as generic support
> > > for idleness detection more complicated to implemented because we again
> > > have to support two types of watermark assigners.
> > >
> > > In this FLIP we propose two things:
> > >
> > > Unify the Watermark Assigners into one Interface WatermarkGenerator
> > > Separate this new interface from the TimestampAssigner
> > > The motivation for the first is to simplify future implementations and
> > > code duplication. The motivation for the second point is again code
> > > deduplication, most assigners currently have to extend from some base
> > > timestamp extractor or duplicate the extraction logic, or users have to
> > > override an abstract method of the watermark assigner to provide the
> > > timestamp extraction logic.
> > >
> > > Additionally, we propose to add a generic wrapping WatermarkGenerator
> > > that provides idleness detection, i.e. it can mark a stream/partition
> as
> > > idle if no data arrives after a configured timeout.
> > >
> > > The "unify and separate" part refers to the fact that we want to unify
> > > punctuated and periodic assigners but at the same time split the
> > > timestamp assigner from the watermark generator.
> > >
> > > Please find more details in the FLIP [1]. Looking forward to
> > > your feedback.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > [1]
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> > >
> > >
> > > [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >
>

Reply via email to