Hi all,

Very thanks @Dawid for resuming the discussion and very thanks @Till for the 
summary ! (and very sorry for I missed the mail and do not response in time...)

I also agree with that we could consider the global commits latter separately 
after we have addressed the final checkpoints, and also other points as Till 
summarized. 
Currently the only case that have used the cascade commit is the Table 
FileSystem and Hive connectors. I checked the code and found currently they 
will commit the 
last piece of data directly  in endOfInput(). Although this might emit repeat 
records if there are failover during job finishing, it avoids emitting the 
records in the 
notifyCheckpointComplete() after endOfInput(), thus the modification to the 
operator lifecycle in final checkpoints would cause compatibility problem for 
these connectors, 
thus we do not need to modify them at the first place. 

2. Regarding the operator lifecycle, I also agree with the proposed changes. To 
sum up, I think the operator lifecycle would become 

endOfInput(1)
...
endOfInput(n)
flush() --> call UDF's flush method
if some operator requires final checkpoints
    snapshotState()
    notifyCheckpointComplete()
end if
close() --> call UDF's close method

Since currently the close() is only called in normal finish and dispose() will 
be called in both failover and normal case, for compatibility, I think we may
have to postpone the change to a single close() method to version 2.0 ? 

3. Regarding the name and position of flush() method, I also agree with that we 
will need a separate method to mark the termination of the whole stream for 
multiple-input streams. Would it be also ok if we have some modification to the 
current BoundedXXInput interfaces to 

interface BoundedInput {
    void endInput() // marks the end of the whole streams, as flush() does. 
}

@deprecated // In the future we could remove this interface
interface BoundedOneInput extends BoundedInput {}

interface BoundedMultiInput extends BoundedInput {
      void endInput(int i);

      default void endInput() {} // For compatibility 
}

If operator/UDF does not care about the end of a single input, then it could 
directly implement the BoundedInput interface. The possible 
benefit to me is that we might be able to keep only one concept for marking the 
end of stream, especially for the operators with only 
one input. 

Very thanks for all the deep insights and discussions!

Best,
Yun


------------------------------------------------------------------
From:Dawid Wysakowicz <dwysakow...@apache.org>
Send Time:2021 Jun. 3 (Thu.) 21:21
To:dev <dev@flink.apache.org>; Till Rohrmann <trohrm...@apache.org>; Yun Gao 
<yungao...@aliyun.com>
Cc:Piotr Nowojski <pnowoj...@apache.org>; Guowei Ma <guowei....@gmail.com>; 
Stephan Ewen <se...@apache.org>
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi all,
Thanks for the very insightful discussion. I'd like to revive the effort of 
FLIP-147. First of all, from my side I'd like to say that I am really 
interested in helping that happen in the upcoming 1.14 release.
I agree with Till that the final checkpoints and global commits are mostly 
orthogonal. Similarly as Till, I'd suggest to first focus on the final 
checkpoints, while just keeping in mind we should not make assumptions that 
would make it impossible to implement the global commits. So far I do not see 
such risk from the discussion.
Going back to the final checkpoints issue. I think the only outstanding issue 
is which methods we want to use for flushing/closing both operators and UDFs 
just before performing the final checkpoint. As pointed out to me by Piotr, I 
am mentioning UDFs here as well, because we need a way for users using the 
Public API to benefit from the final checkpoint (bear in mind that e.g. 
TwoPhaseCommitSinkFunction which is implemented by our Kafka sink operates on 
the UDF level). Right now RichFunction has no method which could be called just 
before the final checkpoint that would say "flush" all intermediate state now 
and prepare for the final checkpoint. I'd suggest introducing an additional 
interface e.g. (name to be determined)
interface Flushable<T> {
   void flush(Collector<T> out)
}
Additionally we would need to introduce a similar method on the StreamOperator 
level. Currently we have two methods that are called at the end of operator 
lifecycle: 
close 
dispose 
The usage of the two methods is a bit confusing. Dispose is responsible for 
closing all open resources and is supposed to be called in case of a failure. 
On the other hand the close is a combination of a non-existent "flush" method 
we need and dispose for closing resources in case of a successful run. I'd 
suggest to clear it a bit. We would introduce a proper "flush" method which 
would be called in case of a successful finishing of an operator. Moreover we 
would make "close" deal only with closing any open resources, basically taking 
over the role of the dispose, which we would deprecate.
Lastly, I'd like to say why I think it is better introduce a new "flush" method 
instead of using the "endInput" method of BoundedOne/MultiInput. That method is 
called per input, which means each operator would need to keep track of which 
inputs were already closed internally and react differently if all of the 
inputs were closed. With an explicit "flush" method we do not have such a 
problem as the input bookkeeping happens on the StreamTask level.
Let me know what you think. I'd sync with Yun Gao and if there are no 
objections we will extend the FLIP page with necessary changes.
Best,
Dawid

Reply via email to