Hi all, Very thanks @Dawid for resuming the discussion and very thanks @Till for the summary ! (and very sorry for I missed the mail and do not response in time...)
I also agree with that we could consider the global commits latter separately after we have addressed the final checkpoints, and also other points as Till summarized. Currently the only case that have used the cascade commit is the Table FileSystem and Hive connectors. I checked the code and found currently they will commit the last piece of data directly in endOfInput(). Although this might emit repeat records if there are failover during job finishing, it avoids emitting the records in the notifyCheckpointComplete() after endOfInput(), thus the modification to the operator lifecycle in final checkpoints would cause compatibility problem for these connectors, thus we do not need to modify them at the first place. 2. Regarding the operator lifecycle, I also agree with the proposed changes. To sum up, I think the operator lifecycle would become endOfInput(1) ... endOfInput(n) flush() --> call UDF's flush method if some operator requires final checkpoints snapshotState() notifyCheckpointComplete() end if close() --> call UDF's close method Since currently the close() is only called in normal finish and dispose() will be called in both failover and normal case, for compatibility, I think we may have to postpone the change to a single close() method to version 2.0 ? 3. Regarding the name and position of flush() method, I also agree with that we will need a separate method to mark the termination of the whole stream for multiple-input streams. Would it be also ok if we have some modification to the current BoundedXXInput interfaces to interface BoundedInput { void endInput() // marks the end of the whole streams, as flush() does. } @deprecated // In the future we could remove this interface interface BoundedOneInput extends BoundedInput {} interface BoundedMultiInput extends BoundedInput { void endInput(int i); default void endInput() {} // For compatibility } If operator/UDF does not care about the end of a single input, then it could directly implement the BoundedInput interface. The possible benefit to me is that we might be able to keep only one concept for marking the end of stream, especially for the operators with only one input. Very thanks for all the deep insights and discussions! Best, Yun ------------------------------------------------------------------ From:Dawid Wysakowicz <dwysakow...@apache.org> Send Time:2021 Jun. 3 (Thu.) 21:21 To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>; Yun Gao <yungao...@aliyun.com> Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma <guowei....@gmail.com>; Stephan Ewen <se...@apache.org> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished Hi all, Thanks for the very insightful discussion. I'd like to revive the effort of FLIP-147. First of all, from my side I'd like to say that I am really interested in helping that happen in the upcoming 1.14 release. I agree with Till that the final checkpoints and global commits are mostly orthogonal. Similarly as Till, I'd suggest to first focus on the final checkpoints, while just keeping in mind we should not make assumptions that would make it impossible to implement the global commits. So far I do not see such risk from the discussion. Going back to the final checkpoints issue. I think the only outstanding issue is which methods we want to use for flushing/closing both operators and UDFs just before performing the final checkpoint. As pointed out to me by Piotr, I am mentioning UDFs here as well, because we need a way for users using the Public API to benefit from the final checkpoint (bear in mind that e.g. TwoPhaseCommitSinkFunction which is implemented by our Kafka sink operates on the UDF level). Right now RichFunction has no method which could be called just before the final checkpoint that would say "flush" all intermediate state now and prepare for the final checkpoint. I'd suggest introducing an additional interface e.g. (name to be determined) interface Flushable<T> { void flush(Collector<T> out) } Additionally we would need to introduce a similar method on the StreamOperator level. Currently we have two methods that are called at the end of operator lifecycle: close dispose The usage of the two methods is a bit confusing. Dispose is responsible for closing all open resources and is supposed to be called in case of a failure. On the other hand the close is a combination of a non-existent "flush" method we need and dispose for closing resources in case of a successful run. I'd suggest to clear it a bit. We would introduce a proper "flush" method which would be called in case of a successful finishing of an operator. Moreover we would make "close" deal only with closing any open resources, basically taking over the role of the dispose, which we would deprecate. Lastly, I'd like to say why I think it is better introduce a new "flush" method instead of using the "endInput" method of BoundedOne/MultiInput. That method is called per input, which means each operator would need to keep track of which inputs were already closed internally and react differently if all of the inputs were closed. With an explicit "flush" method we do not have such a problem as the input bookkeeping happens on the StreamTask level. Let me know what you think. I'd sync with Yun Gao and if there are no objections we will extend the FLIP page with necessary changes. Best, Dawid