Hi Dawid, Piotr, Steven, Very thanks for pointing out these issues and very thanks for the discussion !
Failure before notifyCheckpointComplete() For this issue I would agree with what Piotr has proposed. I tried to use some operators like sink / window as example and currently I also do not found explicit scenarios that might cause problems if records are processed by a task that is assigned with states snapshotted after calling finish() before. For the future cases it seems users should be able to implement their target logic by explicitly add a flag regarding finished, and perhaps have different logic if this part of states are referred to. Besides, this case would only happen on rescaling or topology change, which embedded some kind of user knowledge inside the action. Thus it looks acceptable that we still split the operators state from the task lifecycle, and do not treat checkpoint after finish() differently. Finishing upon receiving notifyCheckpointComplete() of not the latest checkpoint For this issue perhaps we could explicitly requires the task to wait for a checkpoint triggered after finish() method is called for all the operators ? We could be able to achieve this target by maintaining some state inside the task. Checkpointing from a single subtask / UnionListState case This should indeed cause problems, and I also agree with that we could focus on this thread in the https://issues.apache.org/jira/browse/FLINK-21080. Best, Yun ------------------------------------------------------------------ From:Piotr Nowojski <pnowoj...@apache.org> Send Time:2021 Jul. 22 (Thu.) 02:46 To:dev <dev@flink.apache.org> Cc:Yun Gao <yungao...@aliyun.com>; Till Rohrmann <trohrm...@apache.org>; Yun Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks Finished Hi Steven, > I probably missed sth here. isn't this the case today already? Why is it a > concern for the proposed change? The problem is with the newly added `finish()` method and the already existing `endInput()` call. Currently on master there are no issues, because we are not checkpointing any operators after some operators have finished. The purpose of this FLIP-147 is to exactly enable this and this opens a new problem described by Dawid. To paraphrase and to give a concrete example. Assume we have an operator with parallelism of two. Subtask 0 and subtask 1. 1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1 hasn't (yet). 2. Checkpoint 42 is triggered, and it completes. 3. Job fails and is restarted, but at the same time it's rescaled. User has chosen to scale down his operator down to 1. Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` has been processed or not, so while recovering to checkpoint 42, we have to recover both finished subtask 0 (#1) state and not yet finished subtask 1's (#1). But at the same time they are scaled down, so we only have a single subtask 0 (#2) that has a combined state from both of the previous instances. The potentially confusing issue is that the state from subtask 0 (#1) was checkpointed AFTER `endInput()` and `finish()` calls, but it's recovered to an operator that has still some records to process. In step 1. an user for example could store on the operator's state a bit of information "end input has been already called!", that after recovery would no longer be true. Hence the question about `finish()` and `endInput()` semantics. Should it be tied down to state, or just to an operator instance/execution attempt? Piotrek śr., 21 lip 2021 o 19:00 Steven Wu <stevenz...@gmail.com> napisał(a): > if a failure happens after sequence of finish() -> snapshotState(), but before notifyCheckpointComplete(), we will restore such a state and we might end up sending some more records to such an operator. I probably missed sth here. isn't this the case today already? Why is it a concern for the proposed change? On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Dawid, > > Thanks for writing down those concerns. > > I think the first issue boils down what should be the contract of lifecycle > methods like open(), close(), initializeState() etc and especially the new > additions like finish() and endInput(). And what should be their relation > with the operator state (regardless of it's type keyed, non-keyed, union, > ...). Should those methods be tied to state or not? After thinking about it > for a while (and discussing it offline with Dawid), I think the answer > might be no, they shouldn't. I mean maybe we should just openly say that > all of those methods relate to this single particular instance and > execution of the operator. And if a job is recovered/rescaled, we would be > allowed to freely resume consumption, ignoring a fact that maybe some parts > of the state have previously seen `endInput()`. Why? > > 0. Yes, it might be confusing. Especially with `endInput()`. We call > `endInput()`, we store something in a state and later after recovery > combined with rescaling that state can see more records? Indeed weird, > 1. I haven't come up yet with a counterexample that would break and make > impossible to implement a real life use case. Theoretically yes, the user > can store `endInput()` on state, and after rescaling this state would be > inconsistent with what is actually happening with the operator, but I > haven't found a use case that would break because of that. > 2. Otherwise, implementation would be very difficult. > 3. It's difficult to access keyed state from within `endInput()`/`finish()` > calls, as they do not have key context. > 4. After all, openly defining `endInput()` and `finish()` to be tied with > it's operator execution instance lifecycle is not that strange and quite > simple to explain. Sure, it can lead to a bit of confusion (0.), but that > doesn't sound that bad in comparison with the alternatives that I'm aware > of. Also currently methods like `open()` and `close()` are also tied to the > operator execution instance, not to the state. Operators can be opened and > closed multiple times, it doesn't mean that the state is lost after closing > an operator. > > For the UnionListState problem I have posted my proposal in the ticket [1], > so maybe let's move that particular discussion there? > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-21080 > > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org> > napisał(a): > > > Hey all, > > > > To make the issues that were found transparent to the community, I want > to > > post an update: > > > > *1. Committing side-effects* > > We do want to make sure that all side effects are committed before > > bringing tasks down. Side effects are committed when calling > > notifyCheckpointComplete. For the final checkpoint we introduced the > method > > finish(). This notifies the operator that we have consumed all incoming > > records and we are preparing to close the Task. In turn we should flush > any > > pending buffered records and prepare to commit last transactions. The > goal > > is that after a successful sequence of finish() -> snapshotState() -> > > notifyCheckpointComplete(), the remaining state can be considered > > empty/finished and may be discarded. > > > > *Failure before notifyCheckpointComplete()* > > > > The question is what is the contract of the endInput()/finish() methods > > and how do calling these methods affect the operators keyed, non-keyed > > state and external state. Is it allowed to restore state snapshot taken > > after calling endInput()/finish() and process more records? Or do we > assume > > that after a restore from such a state taken after finish() we should not > > call any of the lifecycle methods or at least make sure those methods do > > not emit records/interact with mailbox etc. > > > > Currently it is possible that if a failure happens after sequence of > > finish() -> snapshotState(), but before notifyCheckpointComplete(), we > will > > restore such a state and we might end up sending some more records to > such > > an operator. It is possible if we rescale and this state is merged with a > > state of a subtask that has not called finish() yet. It can also happen > if > > we rescale the upstream operator and the subtask of interest becomes > > connected to a newly added non finished subtask. > > > > *Snapshotting StreamTasks that finish() has been called* > > > > > > We thought about putting a flag into the snapshot of a subtask produced > > after the finish() method. This would make it possible to skip execution > of > > certain lifecycle methods. Unfortunately this creates problems for > > rescaling. How do we deal with a situation that subtask states with both > > the feature flag set and unset end up in a single StreamTask. Additional > > problem is that we merge those states into a single OperatorSubtaskState > on > > CheckpointCoordinator. > > > > *Finishing upon receiving notifyCheckpointComplete() of not the latest > > checkpoint* > > > > We need to wait for a checkpoint to complete, that started after the > > finish() method. However, we support concurrent checkpoints therefore, > > there might be later checkpoints that completed, but the notification has > > not arrived. We must make sure those checkpoints do not leave lingering > > external resources. > > > > *Checkpointing from a single subtask / UnionListState case* > > There are operators that checkpoint from a single subtask only. Usually > > from the subtask index=0. If we let those subtasks finish, subsequent > > checkpoints will miss this information. > > Esp. Legacy sources problem: > > https://issues.apache.org/jira/browse/FLINK-21080 > > > > Best, > > > > Dawid > > On 19/07/2021 15:10, Yun Gao wrote: > > > > Hi Till, Dawid, > > > > Very thanks for the comments and discussion! Glad that it seems we have > > come to a convergence, and I also agree with that we could not include > the > > optimization in the first version. > > > > Best > > Yun > > > > ------------------------------------------------------------------ > > From:Dawid Wysakowicz <dwysakow...@apache.org> <dwysakow...@apache.org> > > Send Time:2021 Jul. 19 (Mon.) 17:51 > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till Rohrmann > > <trohrm...@apache.org> <trohrm...@apache.org> > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid> > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > Finished > > > > Small correction. I meant we need to adjust the EndOfInputEvent of > course. > > > > Best, > > > > Dawid > > > > On 19/07/2021 11:48, Dawid Wysakowicz wrote: > > > Hey Till, > > > > > > Yes, you're right we will have to adjust the current state of > > > EndOfPartitionEvent and move the moment when we emit it to have what > > > we're discussing here. We are aware of that. > > > > > > As for the MAX_WATERMARK vs finish(). My take is that we should always > > > emit MAX_WATERMARK before calling finish() on an operator. At the same > > > time finish() should not leave behind anything in state, as the > > > intention is that we never restore from the taken savepoint/checkpoint > > > (savepoint w drain or bounded data consumed). > > > > > > Best, > > > > > > Dawid > > > > > > On 19/07/2021 11:33, Till Rohrmann wrote: > > >> Hi Yun and Dawid, > > >> > > > > >> Thanks for your comments. I do agree with your comments that finish() > can > > >> do more than MAX_WATERMARK. I guess we should then explain how > > >> MAX_WATERMARK and finish() play together and what kind of > > >> order guarantees we provide. > > >> > > > > >> Concerning the EndOfPartitionEvent, I am not entirely sure whether it > would > > > > >> work in its current state because we send this event when the Task is > about > > >> to shut down if I am not mistaken. What we want to have is to bring > the > > > > >> StreamTasks into a state so that they shut down on the next > checkpoint. For > > >> this we need to keep the StreamTask running. In general, I am a fan of > > >> making things explicit if possible. I think this helps maintenance and > > > > >> evolvability of code. That's why I think sending an EndOfInputEvent > which > > >> is a StreamTask level event and which says that there won't be any > other > > >> records coming only control events could make sense. > > >> > > >> I would leave the proposed optimization out of the first version. We > can > > >> still add it at a later point in time. > > >> > > >> Cheers, > > >> Till > > >> > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz > > <dwysakow...@apache.org> <dwysakow...@apache.org> > > >> wrote: > > >> > > > > >>> Personally I don't find this optimization important and I'd rather > leave > > > > >>> it out not to complicate the codebase further. I doubt we save much > there. > > >>> I don't have a strong opinion though. > > >>> > > >>> Best, > > >>> > > >>> Dawid > > >>> On 19/07/2021 10:31, Yun Gao wrote: > > >>> > > >>> Hi, > > >>> > > >>> Very thanks Dawid for the thoughts! > > >>> > > >>> Currently I also do not have different opinions regarding this part. > > >>> But I have one more issue to confirm: during the previous discussion > we > > >>> have discussed that for the final checkpoint case, we might have an > > >>> optmization > > > > >>> that if a task do not have operators using 2-pc, we might skip > waiting for > > >>> the > > > > >>> final checkpoint (but we could not skip the savepoint). To allow > users to > > >>> express > > >>> the logic, we have proposed to add one more method to StreamOperator > & > > >>> CheckpointListener: > > >>> > > >>> interface StreamOperator { > > >>> default boolean requiresFinalCheckpoint() { > > >>> return true; > > >>> } > > >>> } > > >>> > > >>> interface CheckpointListener { > > >>> > > >>> default boolean requiresFinalCheckpoint() { > > >>> return true; > > >>> } > > >>> } > > >>> > > >>> class AbstractUdfStreamOperator { > > >>> > > >>> @Override > > >>> boolean requiresFinalCheckpoint() { > > >>> return userFunction instanceof CheckpointListener && > > > > >>> ((CheckpointListener) > userFunction).requiresFinalCheckpoint(); > > >>> } > > >>> } > > >>> > > >>> > > >>> I think we should still keep the change ? > > >>> > > >>> Best, > > >>> Yun > > >>> > > >>> ------------------Original Mail ------------------ > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org> > > <dwysakow...@apache.org> > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org> > > >>> *Send Date:*Sun Jul 18 18:44:50 2021 > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org> > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun > > >>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID> > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > >>> Finished > > >>> > > >>>> I think we're all really close to the same solution. > > >>>> > > >>>> > > >>>> > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for time based > > >>>> > > > > >>>> buffering, but it does not solve flushing other operations such as > e.g. > > >>>> > > >>>> count windows or batching requests in Sinks. I'd prefer to treat the > > >>>> > > >>>> finish() as a message for Operator to "flush all records". The > > >>>> > > > > >>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility > imo. I > > >>>> > > >>>> don't think operators need to get a signal "stop-processing" if they > > >>>> > > >>>> don't need to flush records. The "WHEN" records are emitted, should > be > > >>>> > > >>>> in control of the StreamTask, by firing timers or by processing a > next > > >>>> > > >>>> record from upstream. > > >>>> > > >>>> > > >>>> > > > > >>>> The only difference of my previous proposal compared to Yun's is > that I > > >>>> > > >>>> did not want to send the EndOfUserRecords event in case of stop w/o > > >>>> > > >>>> drain. My thinking was that we could directly go from RUNNING to > > >>>> > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit > > >>>> > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing > > >>>> > > > > >>>> timers and processing events (without calling finish() on > Operator). In > > >>>> > > >>>> my initial suggestion I though we don't care about some events > > >>>> > > >>>> potentially being emitted after the savepoint was taken, as they > would > > >>>> > > > > >>>> anyway belong to the next after FINAL, which would be discarded. I > think > > >>>> > > >>>> though the proposal to suspend records processing and timers is a > > >>>> > > > > >>>> sensible thing to do and would go with the version that Yun put > into the > > >>>> > > >>>> FLIP Wiki. > > >>>> > > >>>> > > >>>> > > >>>> What do you think Till? > > >>>> > > >>>> > > >>>> > > >>>> Best, > > >>>> > > >>>> > > >>>> > > >>>> Dawid > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> On 16/07/2021 10:03, Yun Gao wrote: > > >>>> > > >>>>> Hi Till, Piotr > > >>>>> Very thanks for the comments! > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > > > >>>>> I also agree with Piotr that currently they are independent > mechanisms, > > >>>> and they are basically the same > > >>>> > > >>>>> for the event time. > > >>>>> For more details, first there are some difference among the three > > >>>> scenarios regarding the finish: > > >>>> > > > > >>>>> For normal finish and stop-with-savepoint --drain, the job would > not be > > >>>> expected to be restarted, > > >>>> > > >>>>> and for stop-with-savepoint the job would be expected restart > later. > > >>>>> Then for finish / stop-with-savepoint --drain, currently Flink > would > > >>>> emit MAX_WATERMARK before the > > >>>> > > > > >>>>> EndOfPartition. Besides, as we have discussed before [1], > endOfInput / > > >>>> finish() should also only be called > > >>>> > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they > always > > >>>> occurs at the same time. After the change, > > >>>> > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the finish > / > > >>>> stop-with-savepoint --drain cases. > > >>>> > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would > a > > >>>>>> WindowOperator close all windows and emit the results upon calling > > >>>>>> finish, for example? > > >>>>> As discussed above for stop-with-savepoint, we would always keep > the > > >>>> window as is, and restore them after restart. > > >>>> > > >>>>> Then for the finish / stop-with-savepoint --drain, I think perhaps > it > > >>>> depends on the Triggers. For > > >>>> > > > > >>>>> event-time triggers / process time triggers, it would be > reasonable to > > >>>> flush all the windows since logically > > >>>> > > > > >>>>> the time would always elapse and the window would always get > triggered > > >>>> in a logical future. But for triggers > > >>>> > > > > >>>>> like CountTrigger, no matter how much time pass logically, the > windows > > >>>> would not trigger, thus we may not > > >>>> > > >>>>> flush these windows. If there are requirements we may provide > > >>>> additional triggers. > > >>>> > > > > >>>>>> It's a bit messy and I'm not sure if this should be strengthened > out? > > >>>> Each one of those has a little bit different semantic/meaning, > > >>>> > > > > >>>>>> but at the same time they are very similar. For single input > operators > > >>>> `endInput()` and `finish()` are actually the very same thing. > > >>>> > > > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen > at the > > >>>> same time, and for single input operators `endInput()` and > `finish()` > > >>>> > > > > >>>>> are indeed the same thing. During the last discussion we ever > mentioned > > >>>> this issue and at then we thought that we might deprecate > `endInput()` > > >>>> > > >>>>> in the future, then we would only have endInput(int input) and > > >>>> finish(). > > >>>> > > >>>>> Best, > > >>>>> Yun > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132 > > >>>>> ------------------------------------------------------------------ > > >>>>> From:Piotr Nowojski > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48 > > >>>>> To:dev > > >>>>> Cc:Yun Gao > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks > > >>>> Finished > > >>>> > > >>>>> Hi Till, > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > >>>>>> 2) StreamOperator.finish says to flush all buffered events. Would > a> > > >>>> WindowOperator close all windows and emit the results upon calling > > >>>> > > >>>>>> finish, for example? > > >>>>> 1) currently they are independent but parallel mechanisms. With > event > > >>>> time, they are basically the same. > > >>>> > > >>>>> 2) it probably should for the sake of processing time windows. > > >>>>> Here you are touching the bit of the current design that I like the > > > > >>>> least. We basically have now three different ways of conveying very > similar > > >>>> things: > > >>>> > > >>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what > > >>>> about processing time?) > > >>>> > > >>>>> b) endInput(), used for example by AsyncWaitOperator to flush it's > > >>>> internal state > > >>>> > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator > > >>>>> It's a bit messy and I'm not sure if this should be strengthened > out? > > > > >>>> Each one of those has a little bit different semantic/meaning, but > at the > > > > >>>> same time they are very similar. For single input operators > `endInput()` > > >>>> and `finish()` are actually the very same thing. > > >>>> > > >>>>> Piotrek > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a): > > >>>>> Thanks for updating the FLIP. Based on the new section about > > >>>>> stop-with-savepoint [--drain] I got two other questions: > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK? > > >>>>> 2) StreamOperator.finish says to flush all buffered events. Would a > > >>>>> WindowOperator close all windows and emit the results upon calling > > >>>>> finish, for example? > > >>>>> Cheers, > > >>>>> Till > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote: > > >>>>>> Thanks a lot for your answers and clarifications Yun. > > >>>>>> 1+2) Agreed, this can be a future improvement if this becomes a > > >>>> problem. > > >>>> > > >>>>>> 3) Great, this will help a lot with understanding the FLIP. > > >>>>>> Cheers, > > >>>>>> Till > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao > > >>>>>> wrote: > > >>>>>>> Hi Till, > > >>>>>>> Very thanks for the review and comments! > > >>>>>>> 1) First I think in fact we could be able to do the computation > > >>>> outside > > >>>> > > >>>>>>> of the main thread, > > >>>>>>> and the current implementation mainly due to the computation is > in > > >>>>>>> general fast and we > > >>>>>>> initially want to have a simplified first version. > > > > >>>>>>> The main requirement here is to have a constant view of the > state of > > >>>> the > > >>>> > > >>>>>>> tasks, otherwise > > >>>>>>> for example if we have A -> B, if A is running when we check if > we > > >>>> need > > >>>> > > >>>>>>> to trigger A, we will > > >>>>>>> mark A as have to trigger, but if A gets to finished when we > check > > >>>> B, we > > >>>> > > >>>>>>> will also mark B as > > > > >>>>>>> have to trigger, then B will receive both rpc trigger and > checkpoint > > >>>>>>> barrier, which would break > > >>>>>>> some assumption on the task side and complicate the > implementation. > > > > >>>>>>> But to cope this issue, we in fact could first have a snapshot > of the > > >>>>>>> tasks' state and then do the > > > > >>>>>>> computation, both the two step do not need to be in the main > thread. > > > > >>>>>>> 2) For the computation logic, in fact currently we benefit a lot > from > > >>>>>>> some shortcuts on all-to-all > > > > >>>>>>> edges and job vertex with all tasks running, these shortcuts > could do > > >>>>>>> checks on the job vertex level > > >>>>>>> first and skip some job vertices as a whole. With this > optimization > > >>>> we > > >>>> > > >>>>>>> have a O(V) algorithm, and the > > >>>>>>> current running time of the worst case for a job with 320,000 > tasks > > >>>> is > > >>>> > > >>>>>>> less than 100ms. For > > >>>>>>> daily graph sizes the time would be further reduced linearly. > > >>>>>>> If we do the computation based on the last triggered tasks, we > may > > >>>> not > > >>>> > > >>>>>>> easily encode this information > > > > >>>>>>> into the shortcuts on the job vertex level. And since the time > seems > > >>>> to > > >>>> > > >>>>>>> be short, perhaps it is enough > > >>>>>>> to do re-computation from the scratch in consideration of the > > >>>> tradeoff > > >>>> > > >>>>>>> between the performance and > > >>>>>>> the complexity ? > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after the > > >>>> finish() > > >>>> > > >>>>>>> method and before the last > > > > >>>>>>> snapshotState() method so that we could shut down the whole > topology > > >>>> with > > >>>> > > >>>>>>> a single final checkpoint. > > >>>>>>> Very sorry for not include enough details for this part and I'll > > >>>>>>> complement the FLIP with the details on > > >>>>>>> the process of the final checkpoint / savepoint. > > >>>>>>> Best, > > >>>>>>> Yun > > >>>>>>> > ------------------------------------------------------------------ > > >>>>>>> From:Till Rohrmann > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05 > > >>>>>>> To:dev > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After > Tasks > > >>>>>>> Finished > > >>>>>>> Hi everyone, > > >>>>>>> I am a bit late to the voting party but let me ask three > questions: > > > > >>>>>>> 1) Why do we execute the trigger plan computation in the main > thread > > >>>> if we > > >>>> > > > > >>>>>>> cannot guarantee that all tasks are still running when > triggering the > > >>>>>>> checkpoint? Couldn't we do the computation in a different thread > in > > >>>> order > > >>>> > > >>>>>>> to relieve the main thread a bit. > > >>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator > seems > > >>>> to go > > >>>> > > >>>>>>> over the whole topology for every calculation. Wouldn't it be > more > > >>>>>>> efficient to maintain the set of current tasks to trigger and > check > > >>>>>>> whether > > >>>>>>> anything has changed and if so check the succeeding tasks until > we > > >>>> have > > >>>> > > >>>>>>> found the current checkpoint trigger frontier? > > >>>>>>> 3) When are we going to send the endOfInput events to a > downstream > > >>>> task? > > >>>> > > >>>>>>> If > > > > >>>>>>> this happens after we call finish on the upstream operator but > before > > >>>>>>> snapshotState then it would be possible to shut down the whole > > >>>> topology > > >>>> > > > > >>>>>>> with a single final checkpoint. I think this part could benefit > from > > >>>> a bit > > >>>> > > >>>>>>> more detailed description in the FLIP. > > >>>>>>> Cheers, > > >>>>>>> Till > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao > > >>>>>>> wrote: > > >>>>>>>> Hi there, > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing the > > >>>> vote > > >>>> > > >>>>>>> now. > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes: > > >>>>>>>> - Dawid Wysakowicz (binding) > > >>>>>>>> - Piotr Nowojski(binding) > > >>>>>>>> - Jiangang Liu (binding) > > >>>>>>>> - Arvid Heise (binding) > > >>>>>>>> - Jing Zhang (binding) > > >>>>>>>> - Leonard Xu (non-binding) > > >>>>>>>> - Guowei Ma (binding) > > >>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is > > >>>> accepted. > > >>>> > > >>>>>>>> Very thanks everyone! > > >>>>>>>> Best, > > >>>>>>>> Yun > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ > > >>>> > > >>>> > > > > >