I am fine with that.

Much of the principles seem agreed upon. I understand the need to support
code-generated extractors and we should support most of it already (as
Aljoscha mentioned via the factories) can extend this if needed.

I think that the factory approach supports code-generated extractors in a
cleaner way even than an extractor with an open/init method.


On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org>
wrote:

> We're slightly running out of time. I would propose we vote on the basic
> principle and remain open to later additions. This feature is quite
> important to make the new Kafka Source that is developed as part of
> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> the newly added connector.
>
> I know that's a bit unorthodox but would everyone be OK with what's
> currently there and then we iterate?
>
> Best,
> Aljoscha
>
> On 11.05.20 13:57, Aljoscha Krettek wrote:
> > Ah, I meant to write this in my previous email, sorry about that.
> >
> > The WatermarkStrategy, which is basically a factory for a
> > WatermarkGenerator is the replacement for the open() method. This is the
> > same strategy that was followed for StreamOperatorFactory, which was
> > introduced to allow code generation in the Table API [1]. If we need
> > metrics or other things we would add that as a parameter to the factory
> > method. What do you think?
> >
> > Best,
> > Aljoscha
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-11974
> >
> > On 10.05.20 05:07, Jark Wu wrote:
> >> Hi,
> >>
> >> Regarding to the `open()/close()`, I think it's necessary for
> >> Table&SQL to
> >> compile the generated code.
> >> In Table&SQL, the watermark strategy and event-timestamp is defined
> using
> >> SQL expressions, we will
> >> translate and generate Java code for the expressions. If we have
> >> `open()/close()`, we don't need lazy initialization.
> >> Besides that, I can see a need to report some metrics, e.g. the current
> >> watermark, the dirty timestamps (null value), etc.
> >> So I think a simple `open()/close()` with a context which can get
> >> MetricGroup is nice and not complex for the first version.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >>
> >> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote:
> >>
> >>> Thanks, Aljoscha, for picking this up.
> >>>
> >>> I agree with the approach of doing the here proposed set of changes for
> >>> now. It already makes things simpler and adds idleness support
> >>> everywhere.
> >>>
> >>> Rich functions and state always add complexity, let's do this in a next
> >>> step, if we have a really compelling case.
> >>>
> >>>
> >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <aljos...@apache.org>
> >>> wrote:
> >>>
> >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal
> is
> >>>> basically to turn emitting into a "flatMap", we give the
> >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can
> >>>> decide whether to output a watermark or not and can also mark the
> >>>> output
> >>>> as idle. Changing the interface to return a Watermark (as the previous
> >>>> watermark assigner interface did) would not allow that flexibility.
> >>>>
> >>>> Regarding checkpointing the watermark and keeping track of the minimum
> >>>> watermark, this would be the responsibility of the framework (or the
> >>>> KafkaConsumer in the current implementation). The user-supplied WG
> does
> >>>> not need to make sure the watermark doesn't regress.
> >>>>
> >>>> Regarding making the WG a "rich function", I can see the potential
> >>>> benefit but I also see a lot of pitfalls. For example, how should the
> >>>> watermark state be handled in the case of scale-in? It could be made
> to
> >>>> work in the Kafka case by attaching the state to the partition state
> >>>> that we keep, but then we have potential backwards compatibility
> >>>> problems also for the WM state. Does the WG usually need to keep the
> >>>> state or might it be enough if the state is transient, i.e. if you
> have
> >>>> a restart the WG would loose its histogram but it would rebuild it
> >>>> quickly and you would get back to the same steady state as before.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>> Overall I like this proposal; thanks for bringing it forward,
> >>>>> Aljoscha.
> >>>>>
> >>>>> I also like the idea of making the Watermark generator a rich
> function
> >>> --
> >>>>> this should make it more straightforward to implement smarter
> >>>>> watermark
> >>>>> generators. Eg, one that uses state to keep statistics about the
> >>>>> actual
> >>>>> out-of-orderness, and uses those statistics to implement a variable
> >>>> delay.
> >>>>>
> >>>>> David
> >>>>>
> >>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Aljoscha,
> >>>>>>
> >>>>>> Thanks for opening the discussion!
> >>>>>>
> >>>>>> I have two comments on the FLIP:
> >>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
> >>>>>> close(), probably with a Context as argument: I have not fully
> >>>>>> thought
> >>>>>> this through but I think that this is more aligned with the rest of
> >>>>>> our rich functions. In addition, it will allow, for example, to
> >>>>>> initialize the Watermark value, if we decide to checkpoint the
> >>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
> >>>>>> anything in the open()).
> >>>>>> 2) aligned with the above, and with the case where we want to
> >>>>>> checkpoint the watermark in mind, I am wondering about how we could
> >>>>>> implement this in the future. In the FLIP, it is proposed to expose
> >>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. Given
> >>>>>> that there is the implicit contract that watermarks are
> >>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> >>>>>> assume) a check that will compare the last emitted WM against the
> >>>>>> provided one, and emit it only if it is >=. If not, then we risk
> >>>>>> having the user shooting himself on the foot if he/she accidentally
> >>>>>> forgets the check. Given that the WatermarkGenerator and its
> >>>>>> caller do
> >>>>>> not know if the watermark was finally emitted or not (the
> >>>>>> WatermarkOutput#emitWatermark returns void), who will be responsible
> >>>>>> for checkpointing the WM?
> >>>>>>
> >>>>>> Given this, why not having the methods as:
> >>>>>>
> >>>>>> public interface WatermarkGenerator<T> {
> >>>>>>
> >>>>>>       Watermark onEvent(T event, long eventTimestamp,
> WatermarkOutput
> >>>>>> output);
> >>>>>>
> >>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>> }
> >>>>>>
> >>>>>> and the caller will be the one enforcing any invariants, such as
> >>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
> >>>>>> anything that is needed as it will have complete knowledge as to if
> >>>>>> the WM was emitted or not.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Kostas
> >>>>>>
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>
> >>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org>
> >>>> wrote:
> >>>>>>>
> >>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> unification.
> >>> We
> >>>>>>> have considered this FLIP already in the interfaces for FLIP-95 [1]
> >>> and
> >>>>>>> look forward to update to the new unified watermark generators once
> >>>>>>> FLIP-126 has been accepted.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>
> >>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>> Hi Everyone!
> >>>>>>>>
> >>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> >>> separate)
> >>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an
> >>>>>>>> experimental branch. I expanded on that work to provide a PoC for
> >>> the
> >>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>
> >>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>> from TimestampAssigner. This means that sources that want to
> >>>>>>>> support
> >>>>>>>> watermark assignment/extraction in the source need to support two
> >>>>>>>> separate interfaces, we have two operator implementations for the
> >>>>>>>> different flavours. Also, this makes features such as generic
> >>> support
> >>>>>>>> for idleness detection more complicated to implemented because we
> >>>> again
> >>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>
> >>>>>>>> In this FLIP we propose two things:
> >>>>>>>>
> >>>>>>>> Unify the Watermark Assigners into one Interface
> WatermarkGenerator
> >>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>> The motivation for the first is to simplify future implementations
> >>> and
> >>>>>>>> code duplication. The motivation for the second point is again
> code
> >>>>>>>> deduplication, most assigners currently have to extend from some
> >>> base
> >>>>>>>> timestamp extractor or duplicate the extraction logic, or users
> >>>>>>>> have
> >>>> to
> >>>>>>>> override an abstract method of the watermark assigner to provide
> >>>>>>>> the
> >>>>>>>> timestamp extraction logic.
> >>>>>>>>
> >>>>>>>> Additionally, we propose to add a generic wrapping
> >>> WatermarkGenerator
> >>>>>>>> that provides idleness detection, i.e. it can mark a
> >>> stream/partition
> >>>>>> as
> >>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>
> >>>>>>>> The "unify and separate" part refers to the fact that we want to
> >>> unify
> >>>>>>>> punctuated and periodic assigners but at the same time split the
> >>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>
> >>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> >>>>>>>> your feedback.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>> [1]
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to