+1 to what Arvid said. I am also thinking we could even consider dropping the dispose method straightaway to make the need for migration obvious. I'd make that decision during the implementation/on the PR though, once we verify if the deprecation option works.
Best, Dawid On 10/06/2021 09:37, Arvid Heise wrote: > The whole operator API is only for advanced users and is not marked > Public(Evolving). Users have to accept that things change and we have to > use that freedom that we don't have in many other parts of the system. > > The change needs to be very clear in the change notes though. I also don't > expect many users to be affected even if they use operator API and if so, > the change will be quite small. Note that we should still include the > change in the compatibility section of the FLIP even though the API is > non-public. > > On Thu, Jun 10, 2021 at 8:08 AM Yun Gao <yungao...@aliyun.com.invalid> > wrote: > >> Hi all, >> >> Very thanks for the warm discussions! >> >> Regarding the change in the operator lifecycle, I also agree with adding >> the flush/drain/stopAndFlush/finish method. For the duplication between >> this >> method and `endInput` for one input operator, with some offline disucssion >> with >> Dawid now I also think we might instead consider dropping the >> BoundedOneInput >> in the future and thus it should not be an issue. >> >> But for the compatibility problem I still have one concern: for the new >> lifecycle >> we would remove `dispose()` and call `close()` for both normal finish and >> failover, if some >> users use customized operator that only overrides `close()` method, with >> the new lifecycle >> it would still compile, but might have wrong result if there are failover. >> Perhaps the SQL >> mini-batch operator (AbstractMapBundleOperator) is an example: it finish >> the last mini-batch >> in the close() method, if the method also get called in failover, the last >> mini-batch would get emitted, >> and these data would also replayed after failover, which would cause data >> repeatation. But considering >> there might be not too much users write customized operators, would this >> be a blocker ? >> >> Regarding the name of the method, I would also tend to simpler name, >> perhaps finish (should be more easier >> for user to understand) or drain (since the method should only be called >> on termination and stop-with-savepoint >> --drain, the concept would be more consistent). >> >> Regarding the changes in the UDF, I also agree with that to avoid the >> output type issue, we may only >> add flush/drain/stopAndFlush/finish method to sink function for now. In >> the future if there >> are more requirments we may further consider add the method to the >> low-level UDF like >> ProcessFunction / CoProcessFunction as an advanced feature. >> >> Best, >> Yun >> >> >> >> >> >> >> >> >> ------------------Original Mail ------------------ >> Sender:Arvid Heise <ar...@apache.org> >> Send Date:Thu Jun 10 01:08:36 2021 >> Recipients:dev <dev@flink.apache.org> >> CC:Yun Gao <yungao...@aliyun.com>, Till Rohrmann <trohrm...@apache.org> >> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished >> Hi Piot, >> >> >> >> I'm fine with just doing it on the Sink. My responses were focused on the >> >> API (the how) not on the concept (the if). Just keep the methods on the >> >> different places in sync, such that it is easy to introduce a common >> >> interface later. >> >> >> >> Re name: drain is not a reinvention as it's used quite often throughout >> >> Flink (e.g., Mailbox, stopWithSavepoint with drain flag). flush has no link >> >> to life-cycles: you usually do it arbitrarily often. >> >> I like `finish` very much as it relates to JobStatus FINISHED, has a clear >> >> tie to life-cycles, and is crisp. >> >> I also thought about `terminate` but I'd clearly favor `finish` as the >> >> verbs cannot be exchanged in the following: the task may terminate its >> >> operators but the operators should finish their thing first. >> >> >> >> On Wed, Jun 9, 2021 at 6:48 PM Piotr Nowojski wrote: >> >> >> >>> Hi, >>> Arvid: What's the problem with providing `void flush()`/`void drain()` >> only >> >>> in the `SinkFunction`? It would avoid the problem of typing. Why would >> one >> >>> need to have it in the other `Rich***Function`s? For `flush()` to make >>> sense, the entity which has this method, would need to buffer some data. >> I >> >>> don't see this to be reasonable in anything but `SinkFunction`, >>> `ProcessFunction` and operators. >>> Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()` >> is >> >>> all about without reading the java-doc, and afterwards, I would have an >>> impression that someone wanted to reinvent a wheel :) `flush()` is kind >> of >> >>> an industry standard for things like that. Furthermore I don't think >>> `drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()` >>> would be better in this regard, but it also doesn't feel right. Maybe >>> `finish()`? >>> Piotrek >>> śr., 9 cze 2021 o 11:51 Arvid Heise napisał(a): >>>> Hi Dawid, >>>> I see your point. I'd probably add drain only to Rich*Function where we >>>> have the type bounds. Then we still need your Flushable interface in >>>> Rich*Function<..., T> to call it efficiently but we at least avoid >> weird >> >>>> type combinations. I'll have a rethink later. >>>> The proper solution is probably to add to RichFunction and use Void >>>> for RichSinkFunction but I'll have to understand the implications >> first. >> >>>> On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz >>>> >>>> wrote: >>>>> Hey, >>>>> @Arvid The problem with adding the "drain/flush/stopProcessing" method >>> to >>>>> RichFunction is that it is not typed with the output type. At the same >>> time >>>>> we would most likely need a way to emit records from the method. >> That's >> >>>>> originally thought about adding a typed interface which honestly I >> don't >> >>>>> like that much either. >>>>> On the UDF level we do not need to deprecate anything as you said. The >>>>> close there already works as dispose on the Operator level. What we >> are >> >>>>> suggesting is to unify that on the Operator level and deprecate the >>> dispose >>>>> there. @Yun I think we can already do that. We can either try to >> handle >> >>>>> exceptions from the close in the case of a failure or just break it as >>> it >>>>> is a low-level, mostly internal API as Arvid said and also the >> migration >> >>>>> would be really easy there. >>>>> @Till @Arvid I am open for suggestions about the naming. I like the >>>>> "drain" method. >>>>> For now I'd go with @Piotr's proposal to add the "drain" method only >> to >> >>>>> the SinkFunction. We think they are not immediately necessary for any >> of >> >>>>> the other UDFs. >>>>> Best, >>>>> Dawid >>>>> On 09/06/2021 11:20, Arvid Heise wrote: >>>>> I have not followed the complete discussion and can't comment on the >>>>> concepts. However, I have some ideas on the API changes: >>>>> 1. If it's about adding additional life-cycle methods to UDFs, we >> should >> >>>>> add the flush/endOfInput to RichFunction as this is the current way to >>>>> define it. At this point, I don't see the need to add/change anything >>> for >>>>> UDFs. Since RichFunction does not have a dispose, do we even need to >>>>> deprecate anything on UDF level? This would avoid having a new >> interface >> >>>>> Flushable altogether (of which I'm not a big fan, see Piot's mail) >>>>> 2. Further, I'd like to propose drain instead of flush as it would >> more >> >>>>> align with the current nomenclature and makes the intent more obvious. >>>>> However, that only works if there is no clash, so please double-check. >>>>> 3. About changing methods on Operators: I'd say go ahead. It's >>>>> experimental and not too hard to adjust on the user side. I also like >>> the >>>>> idea of beefing up ProcessFunction as a full replacement to custom >>>>> operators but I'd keep that effort separate. >>>>> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann >>>>> wrote: >>>>>> Thanks for the lively discussion everyone. I have to admit that I am >>> not >>>>>> really convinced that we should call the interface Flushable and the >>>>>> method >>>>>> flush. The problem is that this method should in the first place tell >>> the >>>>>> operator that it should stop processing and flush all buffered data. >>> The >>>>>> method "flush" alone does not convey this contract very well. Maybe a >>>>>> more >>>>>> explicit name like stopProcessingAndFlush (maybe only stopProcessing) >>>>>> would >>>>>> be better. Moreover, from the OutputStream.flush method, I would >> expect >> >>>>>> that I can call this method multiple times w/o changing the state of >>> the >>>>>> stream. This is not the case here. >>>>>> Given that the stop processing and flush all results is such an >>> essential >>>>>> lifecycle method of an operator/UDF, I am not sure whether we should >>>>>> offer >>>>>> it as an optional interface users can implement. The problem I see is >>>>>> that >>>>>> users are not aware of it when writing their own operators/UDFs. >> Making >> >>>>>> it >>>>>> part of the actual interfaces makes it more explicit and easier to >>>>>> discover. Maybe there is a way of adding it together with a default >>>>>> implementation, deprecating the other methods, and then eventually >>>>>> removing >>>>>> the old methods. The last step will break APIs, though :-( >>>>>> Cheers, >>>>>> Till >>>>>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski >>>>>> wrote: >>>>>>> Hi, >>>>>>> Thanks for resuming this discussion. I think +1 for the proposal of >>>>>>> dropping (deprecating) `dispose()`, and adding `flush()` to the >>>>>>> `StreamOperator`/udfs. Semantically it would be more like new >>>>>> `close()` is >>>>>>> an equivalent of old `dispose()`. Old `close()` is an equivalent of >>> new >>>>>>> `flush() + close()`. I think it provides a relatively painless >>>>>> migration >>>>>>> path (could we write down this migration?). >>>>>>> However I have some doubts about the Flushable interface. First of >>>>>> all, >>>>>>> it wouldn't work for sinks - sinks have no output. Secondly, I >> don't >> >>>>>> like >>>>>>> that it opens a possibility for problems like this (incompatible >>> output >>>>>>> types): >>>>>>> ``` >>>>>>> public class MyMap implements MapFunction, >>>>>> Flushable >>>>>>> { ...} >>>>>>> ``` >>>>>>> Also after a quick offline discussion with Dawid, I'm not sure >>> anymore >>>>>> to >>>>>>> which UDFs it actually makes sense to add `flush`, as most of them >>>>>>> shouldn't buffer any data. Apart from Sinks, it's usually an >> operator >> >>>>>> that >>>>>>> is buffering the data (that holds true for AsyncFunction, >>>>>> ReduceFunction, >>>>>>> AggregateFunction, MapFunction, FilterFunction, ...). For those >>>>>> functions >>>>>>> it's difficult to buffer any data, as they have no means to control >>>>>> when to >>>>>>> emit this data. One notable exception might be (Co)ProcessFunction, >>> as >>>>>> it >>>>>>> can register timers on it's own. In that case I would propose to do >>> the >>>>>>> following thing: >>>>>>> 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface >>>>>> already >>>>>>> has flush capabilities) >>>>>>> 2. Maybe add `flush(Collector)` to `(Co)ProcessFunction`, but >>> maybe >>>>>> we >>>>>>> can postpone it >>>>>>> 3. Leave other functions alone. >>>>>>> After all, we could add `flush()` to other functions in the future >> if >> >>>>>> we >>>>>>> really find a good motivating example to do so. >>>>>>> About 2. Dawid is pitching an idea to convert `ProcessFunction` >> into >> >>> a >>>>>>> proper `Public` API that would replace StreamOperator. We could >>> change >>>>>>> `StreamOperator` to be purely `@Internal` class/interface, and add >>> the >>>>>>> missing functionality to the `ProcessFunction` (InputSelectable, >>>>>>> BoundedInput, MailboxExecutor). With this, adding `flush()` to >>>>>>> `ProcessFunction` would make a lot of sense. But maybe that should >>> be a >>>>>>> story for another day? >>>>>>> Best, >>>>>>> Piotrek >>>>>>> pt., 4 cze 2021 o 10:36 Yun Gao napisał(a): >>>>>>>> Hi all, >>>>>>>> Very thanks @Dawid for resuming the discussion and very thanks >> @Till >> >>>>>> for >>>>>>>> the summary ! (and very sorry for I missed the mail and do not >>>>>> response >>>>>>>> in time...) >>>>>>>> I also agree with that we could consider the global commits latter >>>>>>>> separately after we have addressed the final checkpoints, and also >>>>>> other >>>>>>>> points as Till summarized. >>>>>>>> Currently the only case that have used the cascade commit is the >>> Table >>>>>>>> FileSystem and Hive connectors. I checked the code and found >>>>>> currently they >>>>>>>> will commit the >>>>>>>> last piece of data directly in endOfInput(). Although this might >>> emit >>>>>>>> repeat records if there are failover during job finishing, it >> avoids >> >>>>>>>> emitting the records in the >>>>>>>> notifyCheckpointComplete() after endOfInput(), thus the >> modification >> >>>>>> to >>>>>>>> the operator lifecycle in final checkpoints would cause >>> compatibility >>>>>>>> problem for these connectors, >>>>>>>> thus we do not need to modify them at the first place. >>>>>>>> 2. Regarding the operator lifecycle, I also agree with the >> proposed >> >>>>>>>> changes. To sum up, I think the operator lifecycle would become >>>>>>>> endOfInput(1) >>>>>>>> ... >>>>>>>> endOfInput(n) >>>>>>>> flush() --> call UDF's flush method >>>>>>>> if some operator requires final checkpoints >>>>>>>> snapshotState() >>>>>>>> notifyCheckpointComplete() >>>>>>>> end if >>>>>>>> close() --> call UDF's close method >>>>>>>> Since currently the close() is only called in normal finish and >>>>>> dispose() >>>>>>>> will be called in both failover and normal case, for >> compatibility, >> >>> I >>>>>> think >>>>>>>> we may >>>>>>>> have to postpone the change to a single close() method to version >>> 2.0 >>>>>> ? >>>>>>>> 3. Regarding the name and position of flush() method, I also agree >>>>>> with >>>>>>>> that we will need a separate method to mark the termination of the >>>>>> whole >>>>>>>> stream for >>>>>>>> multiple-input streams. Would it be also ok if we have some >>>>>> modification >>>>>>>> to the current BoundedXXInput interfaces to >>>>>>>> interface BoundedInput { >>>>>>>> void endInput() // marks the end of the whole streams, as >>> flush() >>>>>>>> does. >>>>>>>> } >>>>>>>> @deprecated // In the future we could remove this interface >>>>>>>> interface BoundedOneInput extends BoundedInput {} >>>>>>>> interface BoundedMultiInput extends BoundedInput { >>>>>>>> void endInput(int i); >>>>>>>> default void endInput() {} // For compatibility >>>>>>>> } >>>>>>>> If operator/UDF does not care about the end of a single input, >> then >> >>> it >>>>>>>> could directly implement the BoundedInput interface. The possible >>>>>>>> benefit to me is that we might be able to keep only one concept >> for >> >>>>>>>> marking the end of stream, especially for the operators with only >>>>>>>> one input. >>>>>>>> Very thanks for all the deep insights and discussions! >>>>>>>> Best, >>>>>>>> Yun >>>>>>>> ------------------------------------------------------------------ >>>>>>>> From:Dawid Wysakowicz >>>>>>>> Send Time:2021 Jun. 3 (Thu.) 21:21 >>>>>>>> To:dev ; Till Rohrmann >>>> ; >>>>>> Yun >>>>>>>> Gao >>>>>>>> Cc:Piotr Nowojski ; Guowei Ma < >>>>>> guowei....@gmail.com>; >>>>>>>> Stephan Ewen >>>>>>>> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks >>>>>> Finished >>>>>>>> Hi all, >>>>>>>> Thanks for the very insightful discussion. I'd like to revive the >>>>>> effort >>>>>>>> of FLIP-147. First of all, from my side I'd like to say that I am >>>>>> really >>>>>>>> interested in helping that happen in the upcoming 1.14 release. >>>>>>>> I agree with Till that the final checkpoints and global commits >> are >> >>>>>>>> mostly orthogonal. Similarly as Till, I'd suggest to first focus >> on >> >>>>>> the >>>>>>>> final checkpoints, while just keeping in mind we should not make >>>>>>>> assumptions that would make it impossible to implement the global >>>>>> commits. >>>>>>>> So far I do not see such risk from the discussion. >>>>>>>> Going back to the final checkpoints issue. I think the only >>>>>> outstanding >>>>>>>> issue is which methods we want to use for flushing/closing both >>>>>> operators >>>>>>>> and UDFs just before performing the final checkpoint. As pointed >> out >> >>>>>> to me >>>>>>>> by Piotr, I am mentioning UDFs here as well, because we need a way >>> for >>>>>>>> users using the Public API to benefit from the final checkpoint >>> (bear >>>>>> in >>>>>>>> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by >>> our >>>>>> Kafka >>>>>>>> sink operates on the UDF level). Right now RichFunction has no >>> method >>>>>> which >>>>>>>> could be called just before the final checkpoint that would say >>>>>> "flush" all >>>>>>>> intermediate state now and prepare for the final checkpoint. I'd >>>>>> suggest >>>>>>>> introducing an additional interface e.g. (name to be determined) >>>>>>>> interface Flushable { >>>>>>>> void flush(Collector out) >>>>>>>> } >>>>>>>> Additionally we would need to introduce a similar method on the >>>>>>>> StreamOperator level. Currently we have two methods that are >> called >> >>>>>> at the >>>>>>>> end of operator lifecycle: >>>>>>>> - >>>>>>>> - close >>>>>>>> - dispose >>>>>>>> The usage of the two methods is a bit confusing. Dispose is >>>>>> responsible >>>>>>>> for closing all open resources and is supposed to be called in >> case >> >>>>>> of a >>>>>>>> failure. On the other hand the close is a combination of a >>>>>> non-existent >>>>>>>> "flush" method we need and dispose for closing resources in case >> of >> >>> a >>>>>>>> successful run. I'd suggest to clear it a bit. We would introduce >> a >> >>>>>> proper >>>>>>>> "flush" method which would be called in case of a successful >>>>>> finishing of >>>>>>>> an operator. Moreover we would make "close" deal only with closing >>>>>> any open >>>>>>>> resources, basically taking over the role of the dispose, which we >>>>>> would >>>>>>>> deprecate. >>>>>>>> Lastly, I'd like to say why I think it is better introduce a new >>>>>> "flush" >>>>>>>> method instead of using the "endInput" method of >>>>>> BoundedOne/MultiInput. >>>>>>>> That method is called per input, which means each operator would >>> need >>>>>> to >>>>>>>> keep track of which inputs were already closed internally and >> react >> >>>>>>>> differently if all of the inputs were closed. With an explicit >>> "flush" >>>>>>>> method we do not have such a problem as the input bookkeeping >>> happens >>>>>> on >>>>>>>> the StreamTask level. >>>>>>>> Let me know what you think. I'd sync with Yun Gao and if there are >>> no >>>>>>>> objections we will extend the FLIP page with necessary changes. >>>>>>>> Best, >>>>>>>> Dawid >>
OpenPGP_signature
Description: OpenPGP digital signature