Hi Piotr,

Very thanks for the suggestions and thoughts!

> Is this a problem? Pipeline would be empty, so EndOfPartitionEvent would be 
> traveling very quickly.

No, this is not a problem, sorry I have some wrong thoughts here, initially in 
fact I'm thinking on this issue raised by 
@kezhu:

> Besides this, will FLIP-147 eventually need some ways to decide whether an 
> operator need final checkpoint
 @Yun @Guowei ?  @Arvid mentions this in earlier mail.

For this issue itself, I'm still lean towards we might still need it, for 
example, suppose we have a job that 
do not need to commit anything on finished, then it do not need to wait for 
checkpoint at all for normal
finish case.

> Yes, but aren't we doing it right now anyway? 
> `StreamSource#advanceToEndOfEventTime`?

Yes, we indeed have advancedEndOfEventTime for both legacy and new sources, 
sorry for the overlook.

> Is this the plan? That upon recovery we are restarting all operators, even 
> those that have already finished? 
Certainly it's one of the possibilities.

For the first version we would tend to use this way since it is easier to 
implement, and we should always need 
to consider the case that tasks are started but operators are finished since 
there might be also tasks with part 
of operators finished. For the long run I think we could continue to optimize 
it via not restart the finished tasks 
at all.

> Keep in mind that those are two separate things, as I mentioned in a previous 
> e-mail:
> > II. When should the `GlobalCommitHandle` be created? Should it be returned 
> > from `snapshotState()`, `notifyCheckpointComplete()` or somewhere else?> > 
> > III. What should be the ordering guarantee between global commit and local 
> > commit, if any? Actually the easiest to implement would be undefined, but 
> > de facto global commit happening before local commits (first invoke > 
> > `notifyCheckpointComplete()` on the `OperatorCoordinator` and either after 
> > or in parallel send `notifyCheckpointComplete()` RPCs). As far as I can 
> > tell, undefined order should work for the use cases that I'm aware of.
>
> We could create the `GlobalCommitHandle` in `StreamOperator#snapshotState()`, 
> while we could also ensure that `notifyCheckpointComplete()` is called on the 
> `OperatorCoordinator` AFTER all of the operators have successfully > 
> processed `notifyCheckpointComplete()`. This would be more difficult to 
> implement, hence I would prefer "undefined" behaviour here, but it's probably 
> possible.

Very thanks for the further explanation, and I also totally agree with that the 
two are separate and we could think on them 
distinctly. Regarding the order, I would still tend to we support the ordered 
case, since the sinks' implementation seem to depend
on this functionality.

Best,
 Yun


------------------------------------------------------------------
From:Piotr Nowojski <pnowoj...@apache.org>
Send Time:2021 Mar. 4 (Thu.) 22:56
To:Kezhu Wang <kez...@gmail.com>
Cc:Till Rohrmann <trohrm...@apache.org>; Guowei Ma <guowei....@gmail.com>; dev 
<dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>; jingsongl...@gmail.com 
<jingsongl...@gmail.com>
Subject:Re: Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun and Kezhu,

> 1. We might introduce a new type of event to notify the endOfInput() though 
> the graph first, and then A/B waits for the finalcheckpoint, then A emit 
> EndOfPartitionEvent to exit all the tasks as now.

As I mentioned in one of the PRs, I would opt for this solution.

>  if we go towards 1, the tasks would still need to exit from the source, and 
> if we go towards 2/3, we could be able to allow thesetasks to finish first.

Is this a problem? Pipeline would be empty, so EndOfPartitionEvent would be 
traveling very quickly.

> should we also need to do it for normal exit 

Yes, but aren't we doing it right now anyway? 
`StreamSource#advanceToEndOfEventTime`? 

> If so, since now for recovery after some tasks finished we would first start 
> all the tasks and stop the finished tasks directly

Is this the plan? That upon recovery we are restarting all operators, even 
those that have already finished? Certainly it's one of the possibilities.

> For example, the global committer handler might be write meta store for 
> FileSystem/Hive sinks, and these should happen after all the pending
> files are renamed to the final ones, otherwise the downstream jobs might miss 
> some files if they relies on the meta store to identify ready partitions.
> Thus we would have to emit the global-committer-handler after 
> notifyCheckpointComplete. But since we could be able to know the list of files
> to rename in snapshotState(), we could create the global-committer-handler 
> and store them there.

Keep in mind that those are two separate things, as I mentioned in a previous 
e-mail:
> II. When should the `GlobalCommitHandle` be created? Should it be returned 
> from `snapshotState()`, `notifyCheckpointComplete()` or somewhere else?> III. 
> What should be the ordering guarantee between global commit and local commit, 
> if any? Actually the easiest to implement would be undefined, but de facto 
> global commit happening before local commits (first invoke 
> `notifyCheckpointComplete()` on the `OperatorCoordinator` and either after or 
> in parallel send `notifyCheckpointComplete()` RPCs). As far as I can tell, 
> undefined order should work for the use cases that I'm aware of.

We could create the `GlobalCommitHandle` in `StreamOperator#snapshotState()`, 
while we could also ensure that `notifyCheckpointComplete()` is called on the 
`OperatorCoordinator` AFTER all of the operators have successfully processed 
`notifyCheckpointComplete()`. This would be more difficult to implement, hence 
I would prefer "undefined" behaviour here, but it's probably possible.

Kezhu:

> Is `endOfInput` in your proposal same as `BoundedOneInput.endInput` ?

Yes it's the same. Sorry for a typo, somehow I was convinced its `endOfInput` 
not simple `endInput` :)

Piotrek
czw., 4 mar 2021 o 11:09 Kezhu Wang <kez...@gmail.com> napisaƂ(a):

Reply via email to