+1 to what Arvid said.

I am also thinking we could even consider dropping the dispose method
straightaway to make the need for migration obvious. I'd make that
decision during the implementation/on the PR though, once we verify if
the deprecation option works.

Best,

Dawid

On 10/06/2021 09:37, Arvid Heise wrote:
> The whole operator API is only for advanced users and is not marked
> Public(Evolving). Users have to accept that things change and we have to
> use that freedom that we don't have in many other parts of the system.
>
> The change needs to be very clear in the change notes though. I also don't
> expect many users to be affected even if they use operator API and if so,
> the change will be quite small. Note that we should still include the
> change in the compatibility section of the FLIP even though the API is
> non-public.
>
> On Thu, Jun 10, 2021 at 8:08 AM Yun Gao <yungao...@aliyun.com.invalid>
> wrote:
>
>> Hi all,
>>
>> Very thanks for the warm discussions!
>>
>> Regarding the change in the operator lifecycle, I also agree with adding
>> the flush/drain/stopAndFlush/finish method. For the duplication between
>> this
>> method and `endInput` for one input operator, with some offline disucssion
>> with
>> Dawid now I also think we might instead consider dropping the
>> BoundedOneInput
>> in the future and thus it should not be an issue.
>>
>> But for the compatibility problem I still have one concern: for the new
>> lifecycle
>> we would remove `dispose()` and call `close()` for both normal finish and
>> failover, if some
>> users use customized operator that only overrides `close()` method, with
>> the new lifecycle
>> it would still compile, but might have wrong result if there are failover.
>> Perhaps the SQL
>> mini-batch operator (AbstractMapBundleOperator) is an example: it finish
>> the last mini-batch
>> in the close() method, if the method also get called in failover, the last
>> mini-batch would get emitted,
>> and these data would also replayed after failover, which would cause data
>> repeatation. But considering
>> there might be not too much users write customized operators, would this
>> be a blocker ?
>>
>> Regarding the name of the method, I would also tend to simpler name,
>> perhaps finish (should be more easier
>> for user to understand) or drain (since the method should only be called
>> on termination and stop-with-savepoint
>> --drain, the concept would be more consistent).
>>
>> Regarding the changes in the UDF, I also agree with that to avoid the
>> output type issue,  we may only
>> add flush/drain/stopAndFlush/finish method to sink function for now. In
>> the future if there
>> are more requirments we may further consider add the method to the
>> low-level UDF like
>> ProcessFunction / CoProcessFunction as an advanced feature.
>>
>> Best,
>> Yun
>>
>>
>>
>>
>>
>>
>>
>>
>>  ------------------Original Mail ------------------
>> Sender:Arvid Heise <ar...@apache.org>
>> Send Date:Thu Jun 10 01:08:36 2021
>> Recipients:dev <dev@flink.apache.org>
>> CC:Yun Gao <yungao...@aliyun.com>, Till Rohrmann <trohrm...@apache.org>
>> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>> Hi Piot,
>>
>>
>>
>> I'm fine with just doing it on the Sink. My responses were focused on the
>>
>> API (the how) not on the concept (the if). Just keep the methods on the
>>
>> different places in sync, such that it is easy to introduce a common
>>
>> interface later.
>>
>>
>>
>> Re name: drain is not a reinvention as it's used quite often throughout
>>
>> Flink (e.g., Mailbox, stopWithSavepoint with drain flag). flush has no link
>>
>> to life-cycles: you usually do it arbitrarily often.
>>
>> I like `finish` very much as it relates to JobStatus FINISHED, has a clear
>>
>> tie to life-cycles, and is crisp.
>>
>> I also thought about `terminate` but I'd clearly favor `finish` as the
>>
>> verbs cannot be exchanged in the following: the task may terminate its
>>
>> operators but the operators should finish their thing first.
>>
>>
>>
>> On Wed, Jun 9, 2021 at 6:48 PM Piotr Nowojski  wrote:
>>
>>
>>
>>> Hi,
>>> Arvid: What's the problem with providing `void flush()`/`void drain()`
>> only
>>
>>> in the `SinkFunction`? It would avoid the problem of typing. Why would
>> one
>>
>>> need to have it in the other `Rich***Function`s? For `flush()` to make
>>> sense, the entity which has this method, would need to buffer some data.
>> I
>>
>>> don't see this to be reasonable in anything but `SinkFunction`,
>>> `ProcessFunction` and operators.
>>> Re `flush()` vs `drain()`. Frankly, I wouldn't understand what `drain()`
>> is
>>
>>> all about without reading the java-doc, and afterwards, I would have an
>>> impression that someone wanted to reinvent a wheel :) `flush()` is kind
>> of
>>
>>> an industry standard for things like that. Furthermore I don't think
>>> `drain()` solves Till's concern (drain != stop + flush). `stopAndFlush()`
>>> would be better in this regard, but it also doesn't feel right. Maybe
>>> `finish()`?
>>> Piotrek
>>> śr., 9 cze 2021 o 11:51 Arvid Heise  napisał(a):
>>>> Hi Dawid,
>>>> I see your point. I'd probably add drain only to Rich*Function where we
>>>> have the type bounds. Then we still need your Flushable interface in
>>>> Rich*Function<..., T> to call it efficiently but we at least avoid
>> weird
>>
>>>> type combinations. I'll have a rethink later.
>>>> The proper solution is probably to add  to RichFunction and use Void
>>>> for RichSinkFunction but I'll have to understand the implications
>> first.
>>
>>>> On Wed, Jun 9, 2021 at 11:37 AM Dawid Wysakowicz
>>>>
>>>> wrote:
>>>>> Hey,
>>>>> @Arvid The problem with adding the "drain/flush/stopProcessing" method
>>> to
>>>>> RichFunction is that it is not typed with the output type. At the same
>>> time
>>>>> we would most likely need a way to emit records from the method.
>> That's
>>
>>>>> originally thought about adding a typed interface which honestly I
>> don't
>>
>>>>> like that much either.
>>>>> On the UDF level we do not need to deprecate anything as you said. The
>>>>> close there already works as dispose on the Operator level. What we
>> are
>>
>>>>> suggesting is to unify that on the Operator level and deprecate the
>>> dispose
>>>>> there. @Yun I think we can already do that. We can either try to
>> handle
>>
>>>>> exceptions from the close in the case of a failure or just break it as
>>> it
>>>>> is a low-level, mostly internal API as Arvid said and also the
>> migration
>>
>>>>> would be really easy there.
>>>>> @Till @Arvid I am open for suggestions about the naming. I like the
>>>>> "drain" method.
>>>>> For now I'd go with @Piotr's proposal to add the "drain" method only
>> to
>>
>>>>> the SinkFunction. We think they are not immediately necessary for any
>> of
>>
>>>>> the other UDFs.
>>>>> Best,
>>>>> Dawid
>>>>> On 09/06/2021 11:20, Arvid Heise wrote:
>>>>> I have not followed the complete discussion and can't comment on the
>>>>> concepts. However, I have some ideas on the API changes:
>>>>> 1. If it's about adding additional life-cycle methods to UDFs, we
>> should
>>
>>>>> add the flush/endOfInput to RichFunction as this is the current way to
>>>>> define it. At this point, I don't see the need to add/change anything
>>> for
>>>>> UDFs. Since RichFunction does not have a dispose, do we even need to
>>>>> deprecate anything on UDF level? This would avoid having a new
>> interface
>>
>>>>> Flushable altogether (of which I'm not a big fan, see Piot's mail)
>>>>> 2. Further, I'd like to propose drain instead of flush as it would
>> more
>>
>>>>> align with the current nomenclature and makes the intent more obvious.
>>>>> However, that only works if there is no clash, so please double-check.
>>>>> 3. About changing methods on Operators: I'd say go ahead. It's
>>>>> experimental and not too hard to adjust on the user side. I also like
>>> the
>>>>> idea of beefing up ProcessFunction as a full replacement to custom
>>>>> operators but I'd keep that effort separate.
>>>>> On Wed, Jun 9, 2021 at 9:38 AM Till Rohrmann
>>>>> wrote:
>>>>>> Thanks for the lively discussion everyone. I have to admit that I am
>>> not
>>>>>> really convinced that we should call the interface Flushable and the
>>>>>> method
>>>>>> flush. The problem is that this method should in the first place tell
>>> the
>>>>>> operator that it should stop processing and flush all buffered data.
>>> The
>>>>>> method "flush" alone does not convey this contract very well. Maybe a
>>>>>> more
>>>>>> explicit name like stopProcessingAndFlush (maybe only stopProcessing)
>>>>>> would
>>>>>> be better. Moreover, from the OutputStream.flush method, I would
>> expect
>>
>>>>>> that I can call this method multiple times w/o changing the state of
>>> the
>>>>>> stream. This is not the case here.
>>>>>> Given that the stop processing and flush all results is such an
>>> essential
>>>>>> lifecycle method of an operator/UDF, I am not sure whether we should
>>>>>> offer
>>>>>> it as an optional interface users can implement. The problem I see is
>>>>>> that
>>>>>> users are not aware of it when writing their own operators/UDFs.
>> Making
>>
>>>>>> it
>>>>>> part of the actual interfaces makes it more explicit and easier to
>>>>>> discover. Maybe there is a way of adding it together with a default
>>>>>> implementation, deprecating the other methods, and then eventually
>>>>>> removing
>>>>>> the old methods. The last step will break APIs, though :-(
>>>>>> Cheers,
>>>>>> Till
>>>>>> On Tue, Jun 8, 2021 at 6:27 PM Piotr Nowojski
>>>>>> wrote:
>>>>>>> Hi,
>>>>>>> Thanks for resuming this discussion. I think +1 for the proposal of
>>>>>>> dropping (deprecating) `dispose()`, and adding `flush()` to the
>>>>>>> `StreamOperator`/udfs. Semantically it would be more like new
>>>>>> `close()` is
>>>>>>> an equivalent of old `dispose()`. Old `close()` is an equivalent of
>>> new
>>>>>>> `flush() + close()`. I think it provides a relatively painless
>>>>>> migration
>>>>>>> path (could we write down this migration?).
>>>>>>> However I have some doubts about the Flushable interface. First of
>>>>>> all,
>>>>>>> it wouldn't work for sinks - sinks have no output. Secondly, I
>> don't
>>
>>>>>> like
>>>>>>> that it opens a possibility for problems like this (incompatible
>>> output
>>>>>>> types):
>>>>>>> ```
>>>>>>> public class MyMap implements MapFunction,
>>>>>> Flushable
>>>>>>> { ...}
>>>>>>> ```
>>>>>>> Also after a quick offline discussion with Dawid, I'm not sure
>>> anymore
>>>>>> to
>>>>>>> which UDFs it actually makes sense to add `flush`, as most of them
>>>>>>> shouldn't buffer any data. Apart from Sinks, it's usually an
>> operator
>>
>>>>>> that
>>>>>>> is buffering the data (that holds true for AsyncFunction,
>>>>>> ReduceFunction,
>>>>>>> AggregateFunction, MapFunction, FilterFunction, ...). For those
>>>>>> functions
>>>>>>> it's difficult to buffer any data, as they have no means to control
>>>>>> when to
>>>>>>> emit this data. One notable exception might be (Co)ProcessFunction,
>>> as
>>>>>> it
>>>>>>> can register timers on it's own. In that case I would propose to do
>>> the
>>>>>>> following thing:
>>>>>>> 1. Add `flush() {}` to `Sink` function (FLIP-143 Writer interface
>>>>>> already
>>>>>>> has flush capabilities)
>>>>>>> 2. Maybe add `flush(Collector)` to `(Co)ProcessFunction`, but
>>> maybe
>>>>>> we
>>>>>>> can postpone it
>>>>>>> 3. Leave other functions alone.
>>>>>>> After all, we could add `flush()` to other functions in the future
>> if
>>
>>>>>> we
>>>>>>> really find a good motivating example to do so.
>>>>>>> About 2. Dawid is pitching an idea to convert `ProcessFunction`
>> into
>>
>>> a
>>>>>>> proper `Public` API that would replace StreamOperator. We could
>>> change
>>>>>>> `StreamOperator` to be purely `@Internal` class/interface, and add
>>> the
>>>>>>> missing functionality to the `ProcessFunction` (InputSelectable,
>>>>>>> BoundedInput, MailboxExecutor). With this, adding `flush()` to
>>>>>>> `ProcessFunction` would make a lot of sense. But maybe that should
>>> be a
>>>>>>> story for another day?
>>>>>>> Best,
>>>>>>> Piotrek
>>>>>>> pt., 4 cze 2021 o 10:36 Yun Gao  napisał(a):
>>>>>>>> Hi all,
>>>>>>>> Very thanks @Dawid for resuming the discussion and very thanks
>> @Till
>>
>>>>>> for
>>>>>>>> the summary ! (and very sorry for I missed the mail and do not
>>>>>> response
>>>>>>>> in time...)
>>>>>>>> I also agree with that we could consider the global commits latter
>>>>>>>> separately after we have addressed the final checkpoints, and also
>>>>>> other
>>>>>>>> points as Till summarized.
>>>>>>>> Currently the only case that have used the cascade commit is the
>>> Table
>>>>>>>> FileSystem and Hive connectors. I checked the code and found
>>>>>> currently they
>>>>>>>> will commit the
>>>>>>>> last piece of data directly in endOfInput(). Although this might
>>> emit
>>>>>>>> repeat records if there are failover during job finishing, it
>> avoids
>>
>>>>>>>> emitting the records in the
>>>>>>>> notifyCheckpointComplete() after endOfInput(), thus the
>> modification
>>
>>>>>> to
>>>>>>>> the operator lifecycle in final checkpoints would cause
>>> compatibility
>>>>>>>> problem for these connectors,
>>>>>>>> thus we do not need to modify them at the first place.
>>>>>>>> 2. Regarding the operator lifecycle, I also agree with the
>> proposed
>>
>>>>>>>> changes. To sum up, I think the operator lifecycle would become
>>>>>>>> endOfInput(1)
>>>>>>>> ...
>>>>>>>> endOfInput(n)
>>>>>>>> flush() --> call UDF's flush method
>>>>>>>> if some operator requires final checkpoints
>>>>>>>> snapshotState()
>>>>>>>> notifyCheckpointComplete()
>>>>>>>> end if
>>>>>>>> close() --> call UDF's close method
>>>>>>>> Since currently the close() is only called in normal finish and
>>>>>> dispose()
>>>>>>>> will be called in both failover and normal case, for
>> compatibility,
>>
>>> I
>>>>>> think
>>>>>>>> we may
>>>>>>>> have to postpone the change to a single close() method to version
>>> 2.0
>>>>>> ?
>>>>>>>> 3. Regarding the name and position of flush() method, I also agree
>>>>>> with
>>>>>>>> that we will need a separate method to mark the termination of the
>>>>>> whole
>>>>>>>> stream for
>>>>>>>> multiple-input streams. Would it be also ok if we have some
>>>>>> modification
>>>>>>>> to the current BoundedXXInput interfaces to
>>>>>>>> interface BoundedInput {
>>>>>>>> void endInput() // marks the end of the whole streams, as
>>> flush()
>>>>>>>> does.
>>>>>>>> }
>>>>>>>> @deprecated // In the future we could remove this interface
>>>>>>>> interface BoundedOneInput extends BoundedInput {}
>>>>>>>> interface BoundedMultiInput extends BoundedInput {
>>>>>>>> void endInput(int i);
>>>>>>>> default void endInput() {} // For compatibility
>>>>>>>> }
>>>>>>>> If operator/UDF does not care about the end of a single input,
>> then
>>
>>> it
>>>>>>>> could directly implement the BoundedInput interface. The possible
>>>>>>>> benefit to me is that we might be able to keep only one concept
>> for
>>
>>>>>>>> marking the end of stream, especially for the operators with only
>>>>>>>> one input.
>>>>>>>> Very thanks for all the deep insights and discussions!
>>>>>>>> Best,
>>>>>>>> Yun
>>>>>>>> ------------------------------------------------------------------
>>>>>>>> From:Dawid Wysakowicz
>>>>>>>> Send Time:2021 Jun. 3 (Thu.) 21:21
>>>>>>>> To:dev ; Till Rohrmann
>>>> ;
>>>>>> Yun
>>>>>>>> Gao
>>>>>>>> Cc:Piotr Nowojski ; Guowei Ma <
>>>>>> guowei....@gmail.com>;
>>>>>>>> Stephan Ewen
>>>>>>>> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
>>>>>> Finished
>>>>>>>> Hi all,
>>>>>>>> Thanks for the very insightful discussion. I'd like to revive the
>>>>>> effort
>>>>>>>> of FLIP-147. First of all, from my side I'd like to say that I am
>>>>>> really
>>>>>>>> interested in helping that happen in the upcoming 1.14 release.
>>>>>>>> I agree with Till that the final checkpoints and global commits
>> are
>>
>>>>>>>> mostly orthogonal. Similarly as Till, I'd suggest to first focus
>> on
>>
>>>>>> the
>>>>>>>> final checkpoints, while just keeping in mind we should not make
>>>>>>>> assumptions that would make it impossible to implement the global
>>>>>> commits.
>>>>>>>> So far I do not see such risk from the discussion.
>>>>>>>> Going back to the final checkpoints issue. I think the only
>>>>>> outstanding
>>>>>>>> issue is which methods we want to use for flushing/closing both
>>>>>> operators
>>>>>>>> and UDFs just before performing the final checkpoint. As pointed
>> out
>>
>>>>>> to me
>>>>>>>> by Piotr, I am mentioning UDFs here as well, because we need a way
>>> for
>>>>>>>> users using the Public API to benefit from the final checkpoint
>>> (bear
>>>>>> in
>>>>>>>> mind that e.g. TwoPhaseCommitSinkFunction which is implemented by
>>> our
>>>>>> Kafka
>>>>>>>> sink operates on the UDF level). Right now RichFunction has no
>>> method
>>>>>> which
>>>>>>>> could be called just before the final checkpoint that would say
>>>>>> "flush" all
>>>>>>>> intermediate state now and prepare for the final checkpoint. I'd
>>>>>> suggest
>>>>>>>> introducing an additional interface e.g. (name to be determined)
>>>>>>>> interface Flushable {
>>>>>>>> void flush(Collector out)
>>>>>>>> }
>>>>>>>> Additionally we would need to introduce a similar method on the
>>>>>>>> StreamOperator level. Currently we have two methods that are
>> called
>>
>>>>>> at the
>>>>>>>> end of operator lifecycle:
>>>>>>>> -
>>>>>>>> - close
>>>>>>>> - dispose
>>>>>>>> The usage of the two methods is a bit confusing. Dispose is
>>>>>> responsible
>>>>>>>> for closing all open resources and is supposed to be called in
>> case
>>
>>>>>> of a
>>>>>>>> failure. On the other hand the close is a combination of a
>>>>>> non-existent
>>>>>>>> "flush" method we need and dispose for closing resources in case
>> of
>>
>>> a
>>>>>>>> successful run. I'd suggest to clear it a bit. We would introduce
>> a
>>
>>>>>> proper
>>>>>>>> "flush" method which would be called in case of a successful
>>>>>> finishing of
>>>>>>>> an operator. Moreover we would make "close" deal only with closing
>>>>>> any open
>>>>>>>> resources, basically taking over the role of the dispose, which we
>>>>>> would
>>>>>>>> deprecate.
>>>>>>>> Lastly, I'd like to say why I think it is better introduce a new
>>>>>> "flush"
>>>>>>>> method instead of using the "endInput" method of
>>>>>> BoundedOne/MultiInput.
>>>>>>>> That method is called per input, which means each operator would
>>> need
>>>>>> to
>>>>>>>> keep track of which inputs were already closed internally and
>> react
>>
>>>>>>>> differently if all of the inputs were closed. With an explicit
>>> "flush"
>>>>>>>> method we do not have such a problem as the input bookkeeping
>>> happens
>>>>>> on
>>>>>>>> the StreamTask level.
>>>>>>>> Let me know what you think. I'd sync with Yun Gao and if there are
>>> no
>>>>>>>> objections we will extend the FLIP page with necessary changes.
>>>>>>>> Best,
>>>>>>>> Dawid
>>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to