I am fine with that. Much of the principles seem agreed upon. I understand the need to support code-generated extractors and we should support most of it already (as Aljoscha mentioned via the factories) can extend this if needed.
I think that the factory approach supports code-generated extractors in a cleaner way even than an extractor with an open/init method. On Mon, May 11, 2020 at 3:38 PM Aljoscha Krettek <aljos...@apache.org> wrote: > We're slightly running out of time. I would propose we vote on the basic > principle and remain open to later additions. This feature is quite > important to make the new Kafka Source that is developed as part of > FLIP-27 useful. Otherwise we would have to use the legacy interfaces in > the newly added connector. > > I know that's a bit unorthodox but would everyone be OK with what's > currently there and then we iterate? > > Best, > Aljoscha > > On 11.05.20 13:57, Aljoscha Krettek wrote: > > Ah, I meant to write this in my previous email, sorry about that. > > > > The WatermarkStrategy, which is basically a factory for a > > WatermarkGenerator is the replacement for the open() method. This is the > > same strategy that was followed for StreamOperatorFactory, which was > > introduced to allow code generation in the Table API [1]. If we need > > metrics or other things we would add that as a parameter to the factory > > method. What do you think? > > > > Best, > > Aljoscha > > > > [1] https://issues.apache.org/jira/browse/FLINK-11974 > > > > On 10.05.20 05:07, Jark Wu wrote: > >> Hi, > >> > >> Regarding to the `open()/close()`, I think it's necessary for > >> Table&SQL to > >> compile the generated code. > >> In Table&SQL, the watermark strategy and event-timestamp is defined > using > >> SQL expressions, we will > >> translate and generate Java code for the expressions. If we have > >> `open()/close()`, we don't need lazy initialization. > >> Besides that, I can see a need to report some metrics, e.g. the current > >> watermark, the dirty timestamps (null value), etc. > >> So I think a simple `open()/close()` with a context which can get > >> MetricGroup is nice and not complex for the first version. > >> > >> Best, > >> Jark > >> > >> > >> > >> On Sun, 10 May 2020 at 00:50, Stephan Ewen <se...@apache.org> wrote: > >> > >>> Thanks, Aljoscha, for picking this up. > >>> > >>> I agree with the approach of doing the here proposed set of changes for > >>> now. It already makes things simpler and adds idleness support > >>> everywhere. > >>> > >>> Rich functions and state always add complexity, let's do this in a next > >>> step, if we have a really compelling case. > >>> > >>> > >>> On Wed, Apr 29, 2020 at 7:24 PM Aljoscha Krettek <aljos...@apache.org> > >>> wrote: > >>> > >>>> Regarding the WatermarkGenerator (WG) interface itself. The proposal > is > >>>> basically to turn emitting into a "flatMap", we give the > >>>> WatermarkGenerator a "collector" (the WatermarkOutput) and the WG can > >>>> decide whether to output a watermark or not and can also mark the > >>>> output > >>>> as idle. Changing the interface to return a Watermark (as the previous > >>>> watermark assigner interface did) would not allow that flexibility. > >>>> > >>>> Regarding checkpointing the watermark and keeping track of the minimum > >>>> watermark, this would be the responsibility of the framework (or the > >>>> KafkaConsumer in the current implementation). The user-supplied WG > does > >>>> not need to make sure the watermark doesn't regress. > >>>> > >>>> Regarding making the WG a "rich function", I can see the potential > >>>> benefit but I also see a lot of pitfalls. For example, how should the > >>>> watermark state be handled in the case of scale-in? It could be made > to > >>>> work in the Kafka case by attaching the state to the partition state > >>>> that we keep, but then we have potential backwards compatibility > >>>> problems also for the WM state. Does the WG usually need to keep the > >>>> state or might it be enough if the state is transient, i.e. if you > have > >>>> a restart the WG would loose its histogram but it would rebuild it > >>>> quickly and you would get back to the same steady state as before. > >>>> > >>>> Best, > >>>> Aljoscha > >>>> > >>>> On 27.04.20 12:12, David Anderson wrote: > >>>>> Overall I like this proposal; thanks for bringing it forward, > >>>>> Aljoscha. > >>>>> > >>>>> I also like the idea of making the Watermark generator a rich > function > >>> -- > >>>>> this should make it more straightforward to implement smarter > >>>>> watermark > >>>>> generators. Eg, one that uses state to keep statistics about the > >>>>> actual > >>>>> out-of-orderness, and uses those statistics to implement a variable > >>>> delay. > >>>>> > >>>>> David > >>>>> > >>>>> On Mon, Apr 27, 2020 at 11:44 AM Kostas Kloudas <kklou...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hi Aljoscha, > >>>>>> > >>>>>> Thanks for opening the discussion! > >>>>>> > >>>>>> I have two comments on the FLIP: > >>>>>> 1) we could add lifecycle methods to the Generator, i.e. open()/ > >>>>>> close(), probably with a Context as argument: I have not fully > >>>>>> thought > >>>>>> this through but I think that this is more aligned with the rest of > >>>>>> our rich functions. In addition, it will allow, for example, to > >>>>>> initialize the Watermark value, if we decide to checkpoint the > >>>>>> watermark (see [1]) (I also do not know if Table/SQL needs to do > >>>>>> anything in the open()). > >>>>>> 2) aligned with the above, and with the case where we want to > >>>>>> checkpoint the watermark in mind, I am wondering about how we could > >>>>>> implement this in the future. In the FLIP, it is proposed to expose > >>>>>> the WatermarkOutput in the methods of the WatermarkGenerator. Given > >>>>>> that there is the implicit contract that watermarks are > >>>>>> non-decreasing, the WatermarkOutput#emitWatermark() will have (I > >>>>>> assume) a check that will compare the last emitted WM against the > >>>>>> provided one, and emit it only if it is >=. If not, then we risk > >>>>>> having the user shooting himself on the foot if he/she accidentally > >>>>>> forgets the check. Given that the WatermarkGenerator and its > >>>>>> caller do > >>>>>> not know if the watermark was finally emitted or not (the > >>>>>> WatermarkOutput#emitWatermark returns void), who will be responsible > >>>>>> for checkpointing the WM? > >>>>>> > >>>>>> Given this, why not having the methods as: > >>>>>> > >>>>>> public interface WatermarkGenerator<T> { > >>>>>> > >>>>>> Watermark onEvent(T event, long eventTimestamp, > WatermarkOutput > >>>>>> output); > >>>>>> > >>>>>> Watermark onPeriodicEmit(WatermarkOutput output); > >>>>>> } > >>>>>> > >>>>>> and the caller will be the one enforcing any invariants, such as > >>>>>> non-decreasing watermarks. In this way, the caller can checkpoint > >>>>>> anything that is needed as it will have complete knowledge as to if > >>>>>> the WM was emitted or not. > >>>>>> > >>>>>> What do you think? > >>>>>> > >>>>>> Cheers, > >>>>>> Kostas > >>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-5601 > >>>>>> > >>>>>> On Tue, Apr 21, 2020 at 2:25 PM Timo Walther <twal...@apache.org> > >>>> wrote: > >>>>>>> > >>>>>>> Thanks for the proposal Aljoscha. This is a very useful > unification. > >>> We > >>>>>>> have considered this FLIP already in the interfaces for FLIP-95 [1] > >>> and > >>>>>>> look forward to update to the new unified watermark generators once > >>>>>>> FLIP-126 has been accepted. > >>>>>>> > >>>>>>> Regards, > >>>>>>> Timo > >>>>>>> > >>>>>>> [1] https://github.com/apache/flink/pull/11692 > >>>>>>> > >>>>>>> On 20.04.20 18:10, Aljoscha Krettek wrote: > >>>>>>>> Hi Everyone! > >>>>>>>> > >>>>>>>> We would like to start a discussion on "FLIP-126: Unify (and > >>> separate) > >>>>>>>> Watermark Assigners" [1]. This work was started by Stephan in an > >>>>>>>> experimental branch. I expanded on that work to provide a PoC for > >>> the > >>>>>>>> changes proposed in this FLIP: [2]. > >>>>>>>> > >>>>>>>> Currently, we have two different flavours of Watermark > >>>>>>>> Assigners: AssignerWithPunctuatedWatermarks > >>>>>>>> and AssignerWithPeriodicWatermarks. Both of them extend > >>>>>>>> from TimestampAssigner. This means that sources that want to > >>>>>>>> support > >>>>>>>> watermark assignment/extraction in the source need to support two > >>>>>>>> separate interfaces, we have two operator implementations for the > >>>>>>>> different flavours. Also, this makes features such as generic > >>> support > >>>>>>>> for idleness detection more complicated to implemented because we > >>>> again > >>>>>>>> have to support two types of watermark assigners. > >>>>>>>> > >>>>>>>> In this FLIP we propose two things: > >>>>>>>> > >>>>>>>> Unify the Watermark Assigners into one Interface > WatermarkGenerator > >>>>>>>> Separate this new interface from the TimestampAssigner > >>>>>>>> The motivation for the first is to simplify future implementations > >>> and > >>>>>>>> code duplication. The motivation for the second point is again > code > >>>>>>>> deduplication, most assigners currently have to extend from some > >>> base > >>>>>>>> timestamp extractor or duplicate the extraction logic, or users > >>>>>>>> have > >>>> to > >>>>>>>> override an abstract method of the watermark assigner to provide > >>>>>>>> the > >>>>>>>> timestamp extraction logic. > >>>>>>>> > >>>>>>>> Additionally, we propose to add a generic wrapping > >>> WatermarkGenerator > >>>>>>>> that provides idleness detection, i.e. it can mark a > >>> stream/partition > >>>>>> as > >>>>>>>> idle if no data arrives after a configured timeout. > >>>>>>>> > >>>>>>>> The "unify and separate" part refers to the fact that we want to > >>> unify > >>>>>>>> punctuated and periodic assigners but at the same time split the > >>>>>>>> timestamp assigner from the watermark generator. > >>>>>>>> > >>>>>>>> Please find more details in the FLIP [1]. Looking forward to > >>>>>>>> your feedback. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Aljoscha > >>>>>>>> > >>>>>>>> [1] > >>>>>>>> > >>>>>> > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-126%3A+Unify+%28and+separate%29+Watermark+Assigners > >>> > >>>>>>>> > >>>>>>>> > >>>>>>>> [2] https://github.com/aljoscha/flink/tree/stephan-event-time > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >> > > > >