+1 to all of Dawid's suggestions, makes a lot of sense to me

On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Aljoscha,
>
> Sorry for adding comments during the vote, but I have some really minor
> suggestions that should not influence the voting thread imo.
>
> 1) Does it make sense to have the TimestampAssigner extend from Flink's
> Function? This implies it has to be serializable which with the factory
> pattern is not strictly necessary, right? BTW I really like that you
> suggested the FunctionInterface annotation there.
>
> 2) Could we rename the IdentityTimestampAssigner to e.g.
> RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner...
> Personally I found the IdentityTimestampAssigner a bit misleading as it
> usually mean a no-op. Which did not click for me, as I assumed it
> somehow returns the incoming record itself.
>
> 3) Could we rename the second parameter of TimestampAssigner#extract to
> e.g. recordTimestamp/nativeTimestamp? This is similar to the point
> above. This parameter was also a bit confusing for me as I thought at
> times its somehow related to
> TimerService#currentProcessingTimestamp()/currentWatermark() as the
> whole system currentTimestamp.
>
> Other than those three points I like the proposal and I was about to
> vote +1 if it was not for those three points.
>
> Best,
>
> Dawid
>
> On 11/05/2020 16:57, Jark Wu wrote:
> > Thanks for the explanation. I like the fatory pattern to make the member
> > variables immutable and final.
> >
> > So +1 to the proposal.
> >
> > Best,
> > Jark
> >
> > On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> wrote:
> >
> >> I am fine with that.
> >>
> >> Much of the principles seem agreed upon. I understand the need to
> support
> >> code-generated extractors and we should support most of it already (as
> >> Aljoscha mentioned via the factories) can extend this if needed.
> >>
> >> I think that the factory approach supports code-generated extractors in
> a
> >> cleaner way even than an extractor with an open/init method.
> >>
> >>
> >> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org>
> >> wrote:
> >>
> >>> We're slightly running out of time. I would propose we vote on the
> basic
> >>> principle and remain open to later additions. This feature is quite
> >>> important to make the new Kafka Source that is developed as part of
> >>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in
> >>> the newly added connector.
> >>>
> >>> I know that's a bit unorthodox but would everyone be OK with what's
> >>> currently there and then we iterate?
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> On 11.05.20 13:57, Aljoscha Krettek wrote:
> >>>> Ah, I meant to write this in my previous email, sorry about that.
> >>>>
> >>>> The WatermarkStrategy, which is basically a factory for a
> >>>> WatermarkGenerator is the replacement for the open() method. This is
> >> the
> >>>> same strategy that was followed for StreamOperatorFactory, which was
> >>>> introduced to allow code generation in the Table API [1]. If we need
> >>>> metrics or other things we would add that as a parameter to the
> factory
> >>>> method. What do you think?
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-11974
> >>>>
> >>>> On 10.05.20 05:07, Jark Wu wrote:
> >>>>> Hi,
> >>>>>
> >>>>> Regarding to the `open()/close()`, I think it's necessary for
> >>>>> Table&SQL to
> >>>>> compile the generated code.
> >>>>> In Table&SQL, the watermark strategy and event-timestamp is defined
> >>> using
> >>>>> SQL expressions, we will
> >>>>> translate and generate Java code for the expressions. If we have
> >>>>> `open()/close()`, we don't need lazy initialization.
> >>>>> Besides that, I can see a need to report some metrics, e.g. the
> >> current
> >>>>> watermark, the dirty timestamps (null value), etc.
> >>>>> So I think a simple `open()/close()` with a context which can get
> >>>>> MetricGroup is nice and not complex for the first version.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote:
> >>>>>
> >>>>>> Thanks, Aljoscha, for picking this up.
> >>>>>>
> >>>>>> I agree with the approach of doing the here proposed set of changes
> >> for
> >>>>>> now. It already makes things simpler and adds idleness support
> >>>>>> everywhere.
> >>>>>>
> >>>>>> Rich functions and state always add complexity, let's do this in a
> >> next
> >>>>>> step, if we have a really compelling case.
> >>>>>>
> >>>>>>
> >>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <
> >> aljos...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The
> proposal
> >>> is
> >>>>>>> basically to turn emitting into a "flatMap", we give the
> >>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG
> >> can
> >>>>>>> decide whether to output a watermark or not and can also mark the
> >>>>>>> output
> >>>>>>> as idle. Changing the interface to return a Watermark (as the
> >> previous
> >>>>>>> watermark assigner interface did) would not allow that flexibility.
> >>>>>>>
> >>>>>>> Regarding checkpointing the watermark and keeping track of the
> >> minimum
> >>>>>>> watermark, this would be the responsibility of the framework (or
> the
> >>>>>>> KafkaConsumer in the current implementation). The user-supplied WG
> >>> does
> >>>>>>> not need to make sure the watermark doesn't regress.
> >>>>>>>
> >>>>>>> Regarding making the WG a "rich function", I can see the potential
> >>>>>>> benefit but I also see a lot of pitfalls. For example, how should
> >> the
> >>>>>>> watermark state be handled in the case of scale-in? It could be
> made
> >>> to
> >>>>>>> work in the Kafka case by attaching the state to the partition
> state
> >>>>>>> that we keep, but then we have potential backwards compatibility
> >>>>>>> problems also for the WM state. Does the WG usually need to keep
> the
> >>>>>>> state or might it be enough if the state is transient, i.e. if you
> >>> have
> >>>>>>> a restart the WG would loose its histogram but it would rebuild it
> >>>>>>> quickly and you would get back to the same steady state as before.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>> On 27.04.20 12:12, David Anderson wrote:
> >>>>>>>> Overall I like this proposal; thanks for bringing it forward,
> >>>>>>>> Aljoscha.
> >>>>>>>>
> >>>>>>>> I also like the idea of making the Watermark generator a rich
> >>> function
> >>>>>> --
> >>>>>>>> this should make it more straightforward to implement smarter
> >>>>>>>> watermark
> >>>>>>>> generators. Eg, one that uses state to keep statistics about the
> >>>>>>>> actual
> >>>>>>>> out-of-orderness, and uses those statistics to implement a
> variable
> >>>>>>> delay.
> >>>>>>>> David
> >>>>>>>>
> >>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <
> >> kklou...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>>> Hi Aljoscha,
> >>>>>>>>>
> >>>>>>>>> Thanks for opening the discussion!
> >>>>>>>>>
> >>>>>>>>> I have two comments on the FLIP:
> >>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/
> >>>>>>>>> close(), probably with a Context as argument: I have not fully
> >>>>>>>>> thought
> >>>>>>>>> this through but I think that this is more aligned with the rest
> >> of
> >>>>>>>>> our rich functions. In addition, it will allow, for example, to
> >>>>>>>>> initialize the Watermark value, if we decide to checkpoint the
> >>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do
> >>>>>>>>> anything in the open()).
> >>>>>>>>> 2) aligned with the above, and with the case where we want to
> >>>>>>>>> checkpoint the watermark in mind, I am wondering about how we
> >> could
> >>>>>>>>> implement this in the future. In the FLIP, it is proposed to
> >> expose
> >>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator.
> >> Given
> >>>>>>>>> that there is the implicit contract that watermarks are
> >>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I
> >>>>>>>>> assume) a check that will compare the last emitted WM against the
> >>>>>>>>> provided one, and emit it only if it is >=. If not, then we risk
> >>>>>>>>> having the user shooting himself on the foot if he/she
> >> accidentally
> >>>>>>>>> forgets the check. Given that the WatermarkGenerator and its
> >>>>>>>>> caller do
> >>>>>>>>> not know if the watermark was finally emitted or not (the
> >>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be
> >> responsible
> >>>>>>>>> for checkpointing the WM?
> >>>>>>>>>
> >>>>>>>>> Given this, why not having the methods as:
> >>>>>>>>>
> >>>>>>>>> public interface WatermarkGenerator<T> {
> >>>>>>>>>
> >>>>>>>>>       Watermark onEvent(T event, long eventTimestamp,
> >>> WatermarkOutput
> >>>>>>>>> output);
> >>>>>>>>>
> >>>>>>>>>       Watermark onPeriodicEmit(WatermarkOutput output);
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> and the caller will be the one enforcing any invariants, such as
> >>>>>>>>> non-decreasing watermarks. In this way, the caller can checkpoint
> >>>>>>>>> anything that is needed as it will have complete knowledge as to
> >> if
> >>>>>>>>> the WM was emitted or not.
> >>>>>>>>>
> >>>>>>>>> What do you think?
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Kostas
> >>>>>>>>>
> >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601
> >>>>>>>>>
> >>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org
> >
> >>>>>>> wrote:
> >>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful
> >>> unification.
> >>>>>> We
> >>>>>>>>>> have considered this FLIP already in the interfaces for FLIP-95
> >> [1]
> >>>>>> and
> >>>>>>>>>> look forward to update to the new unified watermark generators
> >> once
> >>>>>>>>>> FLIP-126 has been accepted.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>> [1] https://github.com/apache/flink/pull/11692
> >>>>>>>>>>
> >>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote:
> >>>>>>>>>>> Hi Everyone!
> >>>>>>>>>>>
> >>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and
> >>>>>> separate)
> >>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in
> an
> >>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC
> >> for
> >>>>>> the
> >>>>>>>>>>> changes proposed in this FLIP: [2].
> >>>>>>>>>>>
> >>>>>>>>>>> Currently, we have two different flavours of Watermark
> >>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks
> >>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend
> >>>>>>>>>>> from TimestampAssigner. This means that sources that want to
> >>>>>>>>>>> support
> >>>>>>>>>>> watermark assignment/extraction in the source need to support
> >> two
> >>>>>>>>>>> separate interfaces, we have two operator implementations for
> >> the
> >>>>>>>>>>> different flavours. Also, this makes features such as generic
> >>>>>> support
> >>>>>>>>>>> for idleness detection more complicated to implemented because
> >> we
> >>>>>>> again
> >>>>>>>>>>> have to support two types of watermark assigners.
> >>>>>>>>>>>
> >>>>>>>>>>> In this FLIP we propose two things:
> >>>>>>>>>>>
> >>>>>>>>>>> Unify the Watermark Assigners into one Interface
> >>> WatermarkGenerator
> >>>>>>>>>>> Separate this new interface from the TimestampAssigner
> >>>>>>>>>>> The motivation for the first is to simplify future
> >> implementations
> >>>>>> and
> >>>>>>>>>>> code duplication. The motivation for the second point is again
> >>> code
> >>>>>>>>>>> deduplication, most assigners currently have to extend from
> some
> >>>>>> base
> >>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or users
> >>>>>>>>>>> have
> >>>>>>> to
> >>>>>>>>>>> override an abstract method of the watermark assigner to
> provide
> >>>>>>>>>>> the
> >>>>>>>>>>> timestamp extraction logic.
> >>>>>>>>>>>
> >>>>>>>>>>> Additionally, we propose to add a generic wrapping
> >>>>>> WatermarkGenerator
> >>>>>>>>>>> that provides idleness detection, i.e. it can mark a
> >>>>>> stream/partition
> >>>>>>>>> as
> >>>>>>>>>>> idle if no data arrives after a configured timeout.
> >>>>>>>>>>>
> >>>>>>>>>>> The "unify and separate" part refers to the fact that we want
> to
> >>>>>> unify
> >>>>>>>>>>> punctuated and periodic assigners but at the same time split
> the
> >>>>>>>>>>> timestamp assigner from the watermark generator.
> >>>>>>>>>>>
> >>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to
> >>>>>>>>>>> your feedback.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Aljoscha
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners
> >>>>>>>>>>>
> >>>>>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time
> >>>>>>>
> >>>
>
>

Reply via email to