+1 to all of Dawid's suggestions, makes a lot of sense to me On Tue, May 12, 2020 at 2:32 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote:
> Hi Aljoscha, > > Sorry for adding comments during the vote, but I have some really minor > suggestions that should not influence the voting thread imo. > > 1) Does it make sense to have the TimestampAssigner extend from Flink's > Function? This implies it has to be serializable which with the factory > pattern is not strictly necessary, right? BTW I really like that you > suggested the FunctionInterface annotation there. > > 2) Could we rename the IdentityTimestampAssigner to e.g. > RecordTimestampAssigner/SystemTimestampAssigner/NativeTimestampAssigner... > Personally I found the IdentityTimestampAssigner a bit misleading as it > usually mean a no-op. Which did not click for me, as I assumed it > somehow returns the incoming record itself. > > 3) Could we rename the second parameter of TimestampAssigner#extract to > e.g. recordTimestamp/nativeTimestamp? This is similar to the point > above. This parameter was also a bit confusing for me as I thought at > times its somehow related to > TimerService#currentProcessingTimestamp()/currentWatermark() as the > whole system currentTimestamp. > > Other than those three points I like the proposal and I was about to > vote +1 if it was not for those three points. > > Best, > > Dawid > > On 11/05/2020 16:57, Jark Wu wrote: > > Thanks for the explanation. I like the fatory pattern to make the member > > variables immutable and final. > > > > So +1 to the proposal. > > > > Best, > > Jark > > > > On Mon, 11 May 2020 at 22:01, Stephan Ewen <se...@apache.org> wrote: > > > >> I am fine with that. > >> > >> Much of the principles seem agreed upon. I understand the need to > support > >> code-generated extractors and we should support most of it already (as > >> Aljoscha mentioned via the factories) can extend this if needed. > >> > >> I think that the factory approach supports code-generated extractors in > a > >> cleaner way even than an extractor with an open/init method. > >> > >> > >> On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org> > >> wrote: > >> > >>> We're slightly running out of time. I would propose we vote on the > basic > >>> principle and remain open to later additions. This feature is quite > >>> important to make the new Kafka Source that is developed as part of > >>> FLIP-27 useful. Otherwise we would have to use the legacy interfaces in > >>> the newly added connector. > >>> > >>> I know that's a bit unorthodox but would everyone be OK with what's > >>> currently there and then we iterate? > >>> > >>> Best, > >>> Aljoscha > >>> > >>> On 11.05.20 13:57, Aljoscha Krettek wrote: > >>>> Ah, I meant to write this in my previous email, sorry about that. > >>>> > >>>> The WatermarkStrategy, which is basically a factory for a > >>>> WatermarkGenerator is the replacement for the open() method. This is > >> the > >>>> same strategy that was followed for StreamOperatorFactory, which was > >>>> introduced to allow code generation in the Table API [1]. If we need > >>>> metrics or other things we would add that as a parameter to the > factory > >>>> method. What do you think? > >>>> > >>>> Best, > >>>> Aljoscha > >>>> > >>>> [1] https://issues.apache.org/jira/browse/FLINK-11974 > >>>> > >>>> On 10.05.20 05:07, Jark Wu wrote: > >>>>> Hi, > >>>>> > >>>>> Regarding to the `open()/close()`, I think it's necessary for > >>>>> Table&SQL to > >>>>> compile the generated code. > >>>>> In Table&SQL, the watermark strategy and event-timestamp is defined > >>> using > >>>>> SQL expressions, we will > >>>>> translate and generate Java code for the expressions. If we have > >>>>> `open()/close()`, we don't need lazy initialization. > >>>>> Besides that, I can see a need to report some metrics, e.g. the > >> current > >>>>> watermark, the dirty timestamps (null value), etc. > >>>>> So I think a simple `open()/close()` with a context which can get > >>>>> MetricGroup is nice and not complex for the first version. > >>>>> > >>>>> Best, > >>>>> Jark > >>>>> > >>>>> > >>>>> > >>>>> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote: > >>>>> > >>>>>> Thanks, Aljoscha, for picking this up. > >>>>>> > >>>>>> I agree with the approach of doing the here proposed set of changes > >> for > >>>>>> now. It already makes things simpler and adds idleness support > >>>>>> everywhere. > >>>>>> > >>>>>> Rich functions and state always add complexity, let's do this in a > >> next > >>>>>> step, if we have a really compelling case. > >>>>>> > >>>>>> > >>>>>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek < > >> aljos...@apache.org> > >>>>>> wrote: > >>>>>> > >>>>>>> Regarding the WatermarkGenerator (WG) interface itself. The > proposal > >>> is > >>>>>>> basically to turn emitting into a "flatMap", we give the > >>>>>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG > >> can > >>>>>>> decide whether to output a watermark or not and can also mark the > >>>>>>> output > >>>>>>> as idle. Changing the interface to return a Watermark (as the > >> previous > >>>>>>> watermark assigner interface did) would not allow that flexibility. > >>>>>>> > >>>>>>> Regarding checkpointing the watermark and keeping track of the > >> minimum > >>>>>>> watermark, this would be the responsibility of the framework (or > the > >>>>>>> KafkaConsumer in the current implementation). The user-supplied WG > >>> does > >>>>>>> not need to make sure the watermark doesn't regress. > >>>>>>> > >>>>>>> Regarding making the WG a "rich function", I can see the potential > >>>>>>> benefit but I also see a lot of pitfalls. For example, how should > >> the > >>>>>>> watermark state be handled in the case of scale-in? It could be > made > >>> to > >>>>>>> work in the Kafka case by attaching the state to the partition > state > >>>>>>> that we keep, but then we have potential backwards compatibility > >>>>>>> problems also for the WM state. Does the WG usually need to keep > the > >>>>>>> state or might it be enough if the state is transient, i.e. if you > >>> have > >>>>>>> a restart the WG would loose its histogram but it would rebuild it > >>>>>>> quickly and you would get back to the same steady state as before. > >>>>>>> > >>>>>>> Best, > >>>>>>> Aljoscha > >>>>>>> > >>>>>>> On 27.04.20 12:12, David Anderson wrote: > >>>>>>>> Overall I like this proposal; thanks for bringing it forward, > >>>>>>>> Aljoscha. > >>>>>>>> > >>>>>>>> I also like the idea of making the Watermark generator a rich > >>> function > >>>>>> -- > >>>>>>>> this should make it more straightforward to implement smarter > >>>>>>>> watermark > >>>>>>>> generators. Eg, one that uses state to keep statistics about the > >>>>>>>> actual > >>>>>>>> out-of-orderness, and uses those statistics to implement a > variable > >>>>>>> delay. > >>>>>>>> David > >>>>>>>> > >>>>>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas < > >> kklou...@gmail.com> > >>>>>>> wrote: > >>>>>>>>> Hi Aljoscha, > >>>>>>>>> > >>>>>>>>> Thanks for opening the discussion! > >>>>>>>>> > >>>>>>>>> I have two comments on the FLIP: > >>>>>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/ > >>>>>>>>> close(), probably with a Context as argument: I have not fully > >>>>>>>>> thought > >>>>>>>>> this through but I think that this is more aligned with the rest > >> of > >>>>>>>>> our rich functions. In addition, it will allow, for example, to > >>>>>>>>> initialize the Watermark value, if we decide to checkpoint the > >>>>>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do > >>>>>>>>> anything in the open()). > >>>>>>>>> 2) aligned with the above, and with the case where we want to > >>>>>>>>> checkpoint the watermark in mind, I am wondering about how we > >> could > >>>>>>>>> implement this in the future. In the FLIP, it is proposed to > >> expose > >>>>>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. > >> Given > >>>>>>>>> that there is the implicit contract that watermarks are > >>>>>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I > >>>>>>>>> assume) a check that will compare the last emitted WM against the > >>>>>>>>> provided one, and emit it only if it is >=. If not, then we risk > >>>>>>>>> having the user shooting himself on the foot if he/she > >> accidentally > >>>>>>>>> forgets the check. Given that the WatermarkGenerator and its > >>>>>>>>> caller do > >>>>>>>>> not know if the watermark was finally emitted or not (the > >>>>>>>>> WatermarkOutput#emitWatermark returns void), who will be > >> responsible > >>>>>>>>> for checkpointing the WM? > >>>>>>>>> > >>>>>>>>> Given this, why not having the methods as: > >>>>>>>>> > >>>>>>>>> public interface WatermarkGenerator<T> { > >>>>>>>>> > >>>>>>>>> Watermark onEvent(T event, long eventTimestamp, > >>> WatermarkOutput > >>>>>>>>> output); > >>>>>>>>> > >>>>>>>>> Watermark onPeriodicEmit(WatermarkOutput output); > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> and the caller will be the one enforcing any invariants, such as > >>>>>>>>> non-decreasing watermarks. In this way, the caller can checkpoint > >>>>>>>>> anything that is needed as it will have complete knowledge as to > >> if > >>>>>>>>> the WM was emitted or not. > >>>>>>>>> > >>>>>>>>> What do you think? > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Kostas > >>>>>>>>> > >>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601 > >>>>>>>>> > >>>>>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org > > > >>>>>>> wrote: > >>>>>>>>>> Thanks for the proposal Aljoscha. This is a very useful > >>> unification. > >>>>>> We > >>>>>>>>>> have considered this FLIP already in the interfaces for FLIP-95 > >> [1] > >>>>>> and > >>>>>>>>>> look forward to update to the new unified watermark generators > >> once > >>>>>>>>>> FLIP-126 has been accepted. > >>>>>>>>>> > >>>>>>>>>> Regards, > >>>>>>>>>> Timo > >>>>>>>>>> > >>>>>>>>>> [1] https://github.com/apache/flink/pull/11692 > >>>>>>>>>> > >>>>>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote: > >>>>>>>>>>> Hi Everyone! > >>>>>>>>>>> > >>>>>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and > >>>>>> separate) > >>>>>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in > an > >>>>>>>>>>> experimental branch. I expanded on that work to provide a PoC > >> for > >>>>>> the > >>>>>>>>>>> changes proposed in this FLIP: [2]. > >>>>>>>>>>> > >>>>>>>>>>> Currently, we have two different flavours of Watermark > >>>>>>>>>>> Assigners: AssignerWithPunctuatedWatermarks > >>>>>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend > >>>>>>>>>>> from TimestampAssigner. This means that sources that want to > >>>>>>>>>>> support > >>>>>>>>>>> watermark assignment/extraction in the source need to support > >> two > >>>>>>>>>>> separate interfaces, we have two operator implementations for > >> the > >>>>>>>>>>> different flavours. Also, this makes features such as generic > >>>>>> support > >>>>>>>>>>> for idleness detection more complicated to implemented because > >> we > >>>>>>> again > >>>>>>>>>>> have to support two types of watermark assigners. > >>>>>>>>>>> > >>>>>>>>>>> In this FLIP we propose two things: > >>>>>>>>>>> > >>>>>>>>>>> Unify the Watermark Assigners into one Interface > >>> WatermarkGenerator > >>>>>>>>>>> Separate this new interface from the TimestampAssigner > >>>>>>>>>>> The motivation for the first is to simplify future > >> implementations > >>>>>> and > >>>>>>>>>>> code duplication. The motivation for the second point is again > >>> code > >>>>>>>>>>> deduplication, most assigners currently have to extend from > some > >>>>>> base > >>>>>>>>>>> timestamp extractor or duplicate the extraction logic, or users > >>>>>>>>>>> have > >>>>>>> to > >>>>>>>>>>> override an abstract method of the watermark assigner to > provide > >>>>>>>>>>> the > >>>>>>>>>>> timestamp extraction logic. > >>>>>>>>>>> > >>>>>>>>>>> Additionally, we propose to add a generic wrapping > >>>>>> WatermarkGenerator > >>>>>>>>>>> that provides idleness detection, i.e. it can mark a > >>>>>> stream/partition > >>>>>>>>> as > >>>>>>>>>>> idle if no data arrives after a configured timeout. > >>>>>>>>>>> > >>>>>>>>>>> The "unify and separate" part refers to the fact that we want > to > >>>>>> unify > >>>>>>>>>>> punctuated and periodic assigners but at the same time split > the > >>>>>>>>>>> timestamp assigner from the watermark generator. > >>>>>>>>>>> > >>>>>>>>>>> Please find more details in the FLIP [1]. Looking forward to > >>>>>>>>>>> your feedback. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Aljoscha > >>>>>>>>>>> > >>>>>>>>>>> [1] > >>>>>>>>>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners > >>>>>>>>>>> > >>>>>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time > >>>>>>> > >>> > >