Hi all,

Thanks for the very insightful discussion. I'd like to revive the effort
of FLIP-147. First of all, from my side I'd like to say that I am really
interested in helping that happen in the upcoming 1.14 release.

I agree with Till that the final checkpoints and global commits are
mostly orthogonal. Similarly as Till, I'd suggest to first focus on the
final checkpoints, while just keeping in mind we should not make
assumptions that would make it impossible to implement the global
commits. So far I do not see such risk from the discussion.

Going back to the final checkpoints issue. I think the only outstanding
issue is which methods we want to use for flushing/closing both
operators and UDFs just before performing the final checkpoint. As
pointed out to me by Piotr, I am mentioning UDFs here as well, because
we need a way for users using the Public API to benefit from the final
checkpoint (bear in mind that e.g. TwoPhaseCommitSinkFunction which is
implemented by our Kafka sink operates on the UDF level). Right now
RichFunction has no method which could be called just before the final
checkpoint that would say "flush" all intermediate state now and prepare
for the final checkpoint. I'd suggest introducing an additional
interface e.g. (name to be determined)

|interface Flushable<T> {|

|   void flush(Collector<T> out)||
|

|}|

Additionally we would need to introduce a similar method on the
StreamOperator level. Currently we have two methods that are called at
the end of operator lifecycle:

  * close
  * dispose

The usage of the two methods is a bit confusing. Dispose is responsible
for closing all open resources and is supposed to be called in case of a
failure. On the other hand the close is a combination of a non-existent
"flush" method we need and dispose for closing resources in case of a
successful run. I'd suggest to clear it a bit. We would introduce a
proper "flush" method which would be called in case of a successful
finishing of an operator. Moreover we would make "close" deal only with
closing any open resources, basically taking over the role of the
dispose, which we would deprecate.

Lastly, I'd like to say why I think it is better introduce a new "flush"
method instead of using the "endInput" method of BoundedOne/MultiInput.
That method is called per input, which means each operator would need to
keep track of which inputs were already closed internally and react
differently if all of the inputs were closed. With an explicit "flush"
method we do not have such a problem as the input bookkeeping happens on
the StreamTask level.

Let me know what you think. I'd sync with Yun Gao and if there are no
objections we will extend the FLIP page with necessary changes.

Best,

Dawid

On 29/03/2021 18:30, Till Rohrmann wrote:
> Thanks a lot for all your input. To sum up the discussion so far:
>
> ## Final checkpoints
>
> We currently agree on favouring a single final checkpoint which can shut
> down the topology. In order to support this we need to be able to create a
> checkpoint after an operator has finished producing results.
>
> If we want to send checkpoint barriers through the topology this means that
> a task must not close the network connection when it sees "logical end of
> data". Instead, on the "logical end of data" the contained operator should
> flush all of its records. This means that we need to introduce a new event
> "logical end of data" and API calls to signal an operator that it should
> flush its data and that it should shut down.
>
> Given the available methods, `endInput` could be used for signalling the
> "logical end of data" and `dispose` for shutting the operator down. A task
> will only shut down and send an "EndOfPartitionEvent" which closes the TCP
> connection if all of its inputs have shut down and if it has completed a
> final checkpoint.
>
> ## Global commits
>
> Now a somewhat related but also orthogonal issue is how to support a global
> commit. A global commit is a commit where the external artefacts of a
> checkpoint become visible all at once. The global commit should be
> supported for streaming as well as batch executions (it is probably even
> more important for batch executions). In general, there could be different
> ways of implementing the global commit mechanism:
>
> 1. Collect global commit handles on the JM and run the global commit action
> on the JM
> 2. Collect global commit handles in a parallelism 1 operator which performs
> the global commit action
>
> Approach 2. would probably require to be able to send records from the
> snapshotState() method which would be the global commit handles. Both
> approaches would have to persist some kind of information in the checkpoint
> which allows redoing the global commit operation in case of a failover.
> Therefore, for approach 1. it would be required that we send the global
> commit handles to the JM from the snapshotState() method and not the
> notifyCheckpointComplete().
>
> A related question is in which order to execute the local and global commit
> actions:
>
> 1. Unspecified order
> 2. First local commits and then global commits
>
> Option 1. would be easier to implement and might already be good enough for
> most sinks.
>
> I would suggest treating final checkpoints and global commits as two
> related but separate things. I think it is fine to first try to solve the
> final checkpoints problem and then to tackle the global commits. This will
> help to decrease the scope of each individual feature.
>
> Cheers,
> Till
>
> On Fri, Mar 5, 2021 at 5:12 AM Yun Gao <yungao...@aliyun.com> wrote:
>
>> Hi Piotr,
>>
>> Very thanks for the suggestions and thoughts!
>>
>>> Is this a problem? Pipeline would be empty, so EndOfPartitionEvent would
>> be traveling very quickly.
>>
>> No, this is not a problem, sorry I have some wrong thoughts here,
>> initially in fact I'm thinking on this issue raised by
>> @kezhu:
>>
>>> Besides this, will FLIP-147 eventually need some ways to decide whether
>> an operator need final checkpoint
>> @Yun @Guowei ?  @Arvid mentions this in earlier mail.
>>
>> For this issue itself, I'm still lean towards we might still need it, for
>> example, suppose we have a job that
>> do not need to commit anything on finished, then it do not need to wait
>> for checkpoint at all for normal
>> finish case.
>>
>>> Yes, but aren't we doing it right now anyway?
>> `StreamSource#advanceToEndOfEventTime`?
>>
>> Yes, we indeed have advancedEndOfEventTime for both legacy and new
>> sources, sorry for the overlook.
>>
>>> Is this the plan? That upon recovery we are restarting all operators,
>> even those that have already finished?
>> Certainly it's one of the possibilities.
>>
>> For the first version we would tend to use this way since it is easier to
>> implement, and we should always need
>> to consider the case that tasks are started but operators are finished
>> since there might be also tasks with part
>> of operators finished. For the long run I think we could continue to
>> optimize it via not restart the finished tasks
>> at all.
>>
>>> Keep in mind that those are two separate things, as I mentioned in a
>> previous e-mail:
>>>> II. When should the `GlobalCommitHandle` be created? Should it be
>> returned from `snapshotState()`, `notifyCheckpointComplete()` or somewhere
>> else?
>>>> III. What should be the ordering guarantee between global commit and
>> local commit, if any? Actually the easiest to implement would be undefined,
>> but de facto global commit happening before local commits (first invoke > 
>> `notifyCheckpointComplete()`
>> on the `OperatorCoordinator` and either after or in parallel send
>> `notifyCheckpointComplete()` RPCs). As far as I can tell, undefined order
>> should work for the use cases that I'm aware of.
>>> We could create the `GlobalCommitHandle` in
>> `StreamOperator#snapshotState()`, while we could also ensure that
>> `notifyCheckpointComplete()` is called on the `OperatorCoordinator` AFTER
>> all of the operators have successfully > processed
>> `notifyCheckpointComplete()`. This would be more difficult to implement,
>> hence I would prefer "undefined" behaviour here, but it's probably possible.
>>
>> Very thanks for the further explanation, and I also totally agree with
>> that the two are separate and we could think on them
>> distinctly. Regarding the order, I would still tend to we support the
>> ordered case, since the sinks' implementation seem to depend
>> on this functionality.
>>
>> Best,
>>  Yun
>>
>> ------------------------------------------------------------------
>> From:Piotr Nowojski <pnowoj...@apache.org>
>> Send Time:2021 Mar. 4 (Thu.) 22:56
>> To:Kezhu Wang <kez...@gmail.com>
>> Cc:Till Rohrmann <trohrm...@apache.org>; Guowei Ma <guowei....@gmail.com>;
>> dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>;
>> jingsongl...@gmail.com <jingsongl...@gmail.com>
>> Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks
>> Finished
>>
>> Hi Yun and Kezhu,
>>
>>> 1. We might introduce a new type of event to notify the endOfInput()
>> though the graph first, and then A/B waits for the final
>> checkpoint, then A emit EndOfPartitionEvent to exit all the tasks as now.
>>
>> As I mentioned in one of the PRs, I would opt for this solution.
>>
>>>  if we go towards 1, the tasks would still need to exit from the source,
>> and if we go towards 2/3, we could be able to allow these
>> tasks to finish first.
>>
>> Is this a problem? Pipeline would be empty, so EndOfPartitionEvent would
>> be traveling very quickly.
>>
>>> should we also need to do it for normal exit
>> Yes, but aren't we doing it right now anyway?
>> `StreamSource#advanceToEndOfEventTime`?
>>
>>> If so, since now for recovery after some tasks finished we would first
>> start all the tasks and stop the finished tasks directly
>>
>> Is this the plan? That upon recovery we are restarting all operators, even
>> those that have already finished? Certainly it's one of the possibilities.
>>
>>> For example, the global committer handler might be write meta store for
>> FileSystem/Hive sinks, and these should happen after all the pending
>>> files are renamed to the final ones, otherwise the downstream jobs might
>> miss some files if they relies on the meta store to identify ready
>> partitions.
>>> Thus we would have to emit the global-committer-handler after
>> notifyCheckpointComplete. But since we could be able to know the list of
>> files
>>> to rename in snapshotState(), we could create the
>> global-committer-handler and store them there.
>>
>> Keep in mind that those are two separate things, as I mentioned in a
>> previous e-mail:
>>> II. When should the `GlobalCommitHandle` be created? Should it be
>> returned from `snapshotState()`, `notifyCheckpointComplete()` or somewhere
>> else?
>>> III. What should be the ordering guarantee between global commit and
>> local commit, if any? Actually the easiest to implement would be undefined,
>> but de facto global commit happening before local commits (first invoke
>> `notifyCheckpointComplete()` on the `OperatorCoordinator` and either after
>> or in parallel send `notifyCheckpointComplete()` RPCs). As far as I can
>> tell, undefined order should work for the use cases that I'm aware of.
>>
>> We could create the `GlobalCommitHandle` in
>> `StreamOperator#snapshotState()`, while we could also ensure that
>> `notifyCheckpointComplete()` is called on the `OperatorCoordinator` AFTER
>> all of the operators have successfully processed
>> `notifyCheckpointComplete()`. This would be more difficult to implement,
>> hence I would prefer "undefined" behaviour here, but it's probably possible.
>>
>> Kezhu:
>>
>>> Is `endOfInput` in your proposal same as `BoundedOneInput.endInput` ?
>> Yes it's the same. Sorry for a typo, somehow I was convinced its
>> `endOfInput` not simple `endInput` :)
>>
>> Piotrek
>>
>> czw., 4 mar 2021 o 11:09 Kezhu Wang <kez...@gmail.com> napisał(a):
>>
>>
>>

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to