I'd be fine to continue w/o a vote as long as the FLIP will contain all the
clarifications we talked about in this thread and in FLINK-21080.

Cheers,
Till

On Fri, Jul 23, 2021 at 8:34 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> How do you feel about proceeding without an additional vote?
>
> I agree, we added quite a bit in there, but none of the added content
> changes what has been voted so far. A big part of the added content can
> be considered as clarifying the implementation. I'd be inclined to
> proceed without a vote. WDYT?
>
> Best,
>
> Dawid
>
>
> On 22/07/2021 14:55, Yun Gao wrote:
> > Hi Till,
> >
> > Very thanks for the comments and tips! We will update the FLIP with the
> new details and we
> > should need a new vote since the FLIP is updated a large part, I think
> we would start the
> > vote right after we also solve the remaining UnionListState problem and
> have a double
> > check on that we have no other missing points for the design and FLIP~
> >
> > Best,
> > Yun
> >
> >
> > ------------------------------------------------------------------
> > From:Till Rohrmann <trohrm...@apache.org>
> > Send Time:2021 Jul. 22 (Thu.) 17:57
> > To:Yun Gao <yungao...@aliyun.com>
> > Cc:Piotr Nowojski <pnowoj...@apache.org>; dev <dev@flink.apache.org>;
> Yun Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski <
> pi...@ververica.com>
> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
> >
> > Thanks everyone for this discussion. I think this is very helpful.
> >
> > I do agree with Piotr's proposal to separate state and the lifecycle of
> a StreamOperator. That way the finished state can be used to recover a
> StreamOperator or to rescale the topology. I also believe that this will
> make the implementation a lot easier since we don't have to remember which
> StreamOperator has actually finished. Moreover, it solves the problem of
> rescaling.
> >
> > I also agree with the proposed solution for checkpoints after finish()
> has been called. I guess the first checkpoint that was triggered after
> finish() and sends notifyCheckpointComplete should be good enough to close
> the StreamTask.
> >
> > Shall we update the FLIP with these details? Do we need another vote for
> it or shall we continue w/o it?
> >
> > Cheers,
> > Till
> > On Thu, Jul 22, 2021 at 10:45 AM Yun Gao <yungao...@aliyun.com> wrote:
> >
> > Hi Piotr,
> >
> > Very thanks for the explanation! and very sorry that initially I should
> wrongly understand the problem Dawid proposed.
> >
> >> And what should we do now? We can of course commit all transactions
> until checkpoint 43.
> >> But should we keep waiting for `notyifyCheckpointComplete(44)`?
> >> hat if in the meantime another checkpoint is triggered? We could end up
> waiting indefinitely.
> >> Our proposal is to shutdown the task immediately after seeing first
> `notifyCheckpointComplete(X)`,
> >> where X is any triggered checkpoint AFTER `finish()`. This should be
> fine, as:
> >> a) ideally there should be no new pending transactions opened after
> checkpoint 42
> >> b) even if operator/function is opening some transactions for
> checkpoint 43 and
> >> checkpoint 44 (`FlinkKafkaProducer`), those transactions after
> checkpoint 42 should be empty
> > Now I understand more on this issue: currently operators like the
> TwoPhaseCommitSinkFunctions would create
> > a new transaction when snapshotting state, thus for each checkpoint
> there would be one corresponding
> > transaction, thus after finish() and before notifyCheckpointComplete(),
> there might be some empty
> > transactions.
> >
> > I also agree with the proposal: we could finish the task after we
> received the first checkpoint complete
> > notification, and the checkpoint with the larger checkpoint id should be
> allowed to be finished. The
> > checkpoint complete notification should be able to commit all the
> non-empty transactions. A part of
> > the empty transactions would also get committed, but it should cause no
> harm. The other empty
> > transactions would be aborted in close(), which should also cause no
> harm.
> >
> >> If checkpoint 44 completes afterwards, it will still be valid. Ideally
> we would recommend that after
> > seeing `finish()` operators/functions should not be opening any new
> transactions, but that shouldn't be required.
> >
> > And from another of view, I think perhaps it might also be acceptable
> that we have some requirements for
> > the pattern of the operators, like the sink would be able to skip
> creating new transactions after finish() is called.
> > Perhaps we may treat the process as a protocol between the framework and
> the operators, and the operators
> > might need to follow the protocol. Of course it would be better that the
> framework could handle all the cases
> > elegantly, and put less implicit limitation to the operators, and at
> least we should guarantee not cause compatibility
> > problem after changing the process.
> >
> > Very thanks for the careful checks on the whole process.
> >
> > Best,
> > Yun
> >
> >
> > ------------------------------------------------------------------
> > From:Piotr Nowojski <pnowoj...@apache.org>
> > Send Time:2021 Jul. 22 (Thu.) 15:33
> > To:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
> > Cc:Till Rohrmann <trohrm...@apache.org>; Yun Gao
> <yungao...@aliyun.com.invalid>; Piotr Nowojski <pi...@ververica.com>
> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
> >
> > Hi Guowei,
> >
> >> Thank Dawid and Piotr for sharing the problem. +1 to EndInput/Finish
> can becalled repeatedly.
> > Just to clarify. It's not about calling `finish()` and `endInput()`
> repeatedly, but about (from the perspective of operator's state)
> > 1. seeing `finish()`
> > 2. checkpoint X triggered and completed
> > 3. failure + recovery from X
> > 4. potentially processing more records
> > 5. another `finish()`
> >
> > But from the context of the remaining part of your message Guowei I
> presume that you have already got that point :)
> >
> > Yun:
> >
> >> For this issue perhaps we could explicitly requires the task to wait
> for a checkpoint triggered after finish()> method is called for all the
> operators ? We could be able to achieve this target by maintaining
> >> some state inside the task.
> > Isn't this exactly the "WAITING_FOR_FINAL_CP" from the FLIP document?
> That we always need to wait for a checkpoint triggered after `finish()` to
> complete, before shutting down a task?
> >
> > What Dawid was describing is a scenario where:
> > 1. task/operator received `finish()`
> > 2. checkpoint 42 triggered (not yet completed)
> > 3. checkpoint 43 triggered (not yet completed)
> > 4. checkpoint 44 triggered (not yet completed)
> > 5. notifyCheckpointComplete(43)
> >
> > And what should we do now? We can of course commit all transactions
> until checkpoint 43. But should we keep waiting for
> `notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint
> is triggered? We could end up waiting indefinitely.
> >
> > Our proposal is to shutdown the task immediately after seeing first
> `notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER
> `finish()`. This should be fine, as:
> > a) ideally there should be no new pending transactions opened after
> checkpoint 42
> > b) even if operator/function is opening some transactions for checkpoint
> 43 and checkpoint 44 (`FlinkKafkaProducer`), those transactions after
> checkpoint 42 should be empty
> >
> > Hence comment from Dawid
> >> We must make sure those checkpoints do not leave lingering external
> resources.
> > After seeing 5. (notifyCheckpointComplete(43)) It should be good enough
> to:
> > - commit transactions from checkpoint 42, (and 43 if they were created,
> depends on the user code)
> > - close operator, aborting any pending transactions (for checkpoint 44
> if they were opened, depends on the user code)
> >
> > If checkpoint 44 completes afterwards, it will still be valid. Ideally
> we would recommend that after seeing `finish()` operators/functions should
> not be opening any new transactions, but that shouldn't be required.
> >
> > Best,
> > Piotrek
> > czw., 22 lip 2021 o 09:00 Yun Gao <yungao...@aliyun.com.invalid>
> napisał(a):
> > Hi Dawid, Piotr, Steven,
> >
> >  Very thanks for pointing out these issues and very thanks for the
> discussion !
> >
> >  Failure before notifyCheckpointComplete()
> >
> >  For this issue I would agree with what Piotr has proposed. I tried to
> use some
> >  operators like sink / window as example and currently I also do not
> found
> >  explicit scenarios that might cause problems if records are processed
> by a
> >  task that is assigned with states snapshotted after calling finish()
> before.
> >  For the future cases it seems users should be able to implement their
> >  target logic by explicitly add a flag regarding finished, and perhaps
> have
> >  different logic if this part of states are referred to. Besides, this
> case would
> >  only happen on rescaling or topology change, which embedded some kind
> of user
> >  knowledge inside the action. Thus it looks acceptable that we still
> split the operators
> >  state from the task lifecycle, and do not treat checkpoint after
> finish() differently.
> >
> >  Finishing upon receiving notifyCheckpointComplete() of not the latest
> checkpoint
> >
> >  For this issue perhaps we could explicitly requires the task to wait
> for a checkpoint triggered after finish()
> >  method is called for all the operators ? We could be able to achieve
> this target by maintaining
> >  some state inside the task.
> >
> >  Checkpointing from a single subtask / UnionListState case
> >
> >  This should indeed cause problems, and I also agree with that we could
> focus on this thread in the
> > https://issues.apache.org/jira/browse/FLINK-21080.
> >
> >  Best,
> >  Yun
> >
> >
> >  ------------------------------------------------------------------
> >  From:Piotr Nowojski <pnowoj...@apache.org>
> >  Send Time:2021 Jul. 22 (Thu.) 02:46
> >  To:dev <dev@flink.apache.org>
> >  Cc:Yun Gao <yungao...@aliyun.com>; Till Rohrmann <trohrm...@apache.org>;
> Yun Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski <
> pi...@ververica.com>
> >  Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
> >
> >  Hi Steven,
> >
> >  > I probably missed sth here. isn't this the case today already? Why is
> it a concern for the proposed change?
> >
> >  The problem is with the newly added `finish()` method and the already
> existing `endInput()` call. Currently on master there are no issues,
> because we are not checkpointing any operators after some operators have
> finished. The purpose of this FLIP-147 is to exactly enable this and this
> opens a new problem described by Dawid.
> >
> >  To paraphrase and to give a concrete example.  Assume we have an
> operator with parallelism of two. Subtask 0 and subtask 1.
> >
> >  1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1
> hasn't (yet).
> >  2. Checkpoint 42 is triggered, and it completes.
> >  3. Job fails and is restarted, but at the same time it's rescaled. User
> has chosen to scale down his operator down to 1.
> >
> >  Now we have a pickle. We don't know if `notifyCheckpointComplete(42)`
> has been processed or not, so while recovering to checkpoint 42, we have to
> recover both finished subtask 0 (#1) state and not yet finished subtask 1's
> (#1). But at the same time they are scaled down, so we only have a single
> subtask 0 (#2) that has a combined state from both of the previous
> instances. The potentially confusing issue is that the state from subtask 0
> (#1) was checkpointed AFTER `endInput()` and `finish()` calls, but it's
> recovered to an operator that has still some records to process. In step 1.
> an user for example could store on the operator's state a bit of
> information "end input has been already called!", that after recovery would
> no longer be true.
> >
> >  Hence the question about `finish()` and `endInput()` semantics. Should
> it be tied down to state, or just to an operator instance/execution attempt?
> >
> >  Piotrek
> >  śr., 21 lip 2021 o 19:00 Steven Wu <stevenz...@gmail.com> napisał(a):
> >  > if a failure happens after sequence of finish() -> snapshotState(),
> but
> >   before notifyCheckpointComplete(), we will restore such a state and we
> >   might end up sending some more records to such an operator.
> >
> >   I probably missed sth here. isn't this the case today already? Why is
> it a
> >   concern for the proposed change?
> >
> >   On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
> >
> >   > Hi Dawid,
> >   >
> >   > Thanks for writing down those concerns.
> >   >
> >   > I think the first issue boils down what should be the contract of
> lifecycle
> >   > methods like open(), close(), initializeState() etc and especially
> the new
> >   > additions like finish() and endInput(). And what should be their
> relation
> >   > with the operator state (regardless of it's type keyed, non-keyed,
> union,
> >   > ...). Should those methods be tied to state or not? After thinking
> about it
> >   > for a while (and discussing it offline with Dawid), I think the
> answer
> >   > might be no, they shouldn't. I mean maybe we should just openly say
> that
> >   > all of those methods relate to this single particular instance and
> >   > execution of the operator. And if a job is recovered/rescaled, we
> would be
> >   > allowed to freely resume consumption, ignoring a fact that maybe
> some parts
> >   > of the state have previously seen `endInput()`. Why?
> >   >
> >   > 0. Yes, it might be confusing. Especially with `endInput()`. We call
> >   > `endInput()`, we store something in a state and later after recovery
> >   > combined with rescaling that state can see more records? Indeed
> weird,
> >   > 1. I haven't come up yet with a counterexample that would break and
> make
> >   > impossible to implement a real life use case. Theoretically yes, the
> user
> >   > can store `endInput()` on state, and after rescaling this state
> would be
> >   > inconsistent with what is actually happening with the operator, but I
> >   > haven't found a use case that would break because of that.
> >   > 2. Otherwise, implementation would be very difficult.
> >   > 3. It's difficult to access keyed state from within
> `endInput()`/`finish()`
> >   > calls, as they do not have key context.
> >   > 4. After all, openly defining `endInput()` and `finish()` to be tied
> with
> >   > it's operator execution instance lifecycle is not that strange and
> quite
> >   > simple to explain. Sure, it can lead to a bit of confusion (0.), but
> that
> >   > doesn't sound that bad in comparison with the alternatives that I'm
> aware
> >   > of. Also currently methods like `open()` and `close()` are also tied
> to the
> >   > operator execution instance, not to the state. Operators can be
> opened and
> >   > closed multiple times, it doesn't mean that the state is lost after
> closing
> >   > an operator.
> >   >
> >   > For the UnionListState problem I have posted my proposal in the
> ticket [1],
> >   > so maybe let's move that particular discussion there?
> >   >
> >   > Piotrek
> >   >
> >   > [1] https://issues.apache.org/jira/browse/FLINK-21080
> >   >
> >   > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org>
> >   > napisał(a):
> >   >
> >   > > Hey all,
> >   > >
> >   > > To make the issues that were found transparent to the community, I
> want
> >   > to
> >   > > post an update:
> >   > >
> >   > > *1. Committing side-effects*
> >   > > We do want to make sure that all side effects are committed before
> >   > > bringing tasks down. Side effects are committed when calling
> >   > > notifyCheckpointComplete. For the final checkpoint we introduced
> the
> >   > method
> >   > > finish(). This notifies the operator that we have consumed all
> incoming
> >   > > records and we are preparing to close the Task. In turn we should
> flush
> >   > any
> >   > > pending buffered records and prepare to commit last transactions.
> The
> >   > goal
> >   > > is that after a successful sequence of finish() -> snapshotState()
> ->
> >   > > notifyCheckpointComplete(), the remaining state can be considered
> >   > > empty/finished and may be discarded.
> >   > >
> >   > > *Failure before notifyCheckpointComplete()*
> >   > >
> >   > > The question is what is the contract of the endInput()/finish()
> methods
> >   > > and how do calling these methods affect the operators keyed,
> non-keyed
> >   > > state and external state. Is it allowed to restore state snapshot
> taken
> >   > > after calling endInput()/finish() and process more records? Or do
> we
> >   > assume
> >   > > that after a restore from such a state taken after finish() we
> should not
> >   > > call any of the lifecycle methods or at least make sure those
> methods do
> >   > > not emit records/interact with mailbox etc.
> >   > >
> >   > > Currently it is possible that if a failure happens after sequence
> of
> >   > > finish() -> snapshotState(), but before
> notifyCheckpointComplete(), we
> >   > will
> >   > > restore such a state and we might end up sending some more records
> to
> >   > such
> >   > > an operator. It is possible if we rescale and this state is merged
> with a
> >   > > state of a subtask that has not called finish() yet. It can also
> happen
> >   > if
> >   > > we rescale the upstream operator and the subtask of interest
> becomes
> >   > > connected to a newly added non finished subtask.
> >   > >
> >   > > *Snapshotting StreamTasks that finish() has been called*
> >   > >
> >   > >
> >   > > We thought about putting a flag into the snapshot of a subtask
> produced
> >   > > after the finish() method. This would make it possible to skip
> execution
> >   > of
> >   > > certain lifecycle methods. Unfortunately this creates problems for
> >   > > rescaling. How do we deal with a situation that subtask states
> with both
> >   > > the feature flag set and unset end up in a single StreamTask.
> Additional
> >   > > problem is that we merge those states into a single
> OperatorSubtaskState
> >   > on
> >   > > CheckpointCoordinator.
> >   > >
> >   > > *Finishing upon receiving notifyCheckpointComplete() of not the
> latest
> >   > > checkpoint*
> >   > >
> >   > > We need to wait for a checkpoint to complete, that started after
> the
> >   > > finish() method. However, we support concurrent checkpoints
> therefore,
> >   > > there might be later checkpoints that completed, but the
> notification has
> >   > > not arrived. We must make sure those checkpoints do not leave
> lingering
> >   > > external resources.
> >   > >
> >   > > *Checkpointing from a single subtask / UnionListState case*
> >   > > There are operators that checkpoint from a single subtask only.
> Usually
> >   > > from the subtask index=0. If we let those subtasks finish,
> subsequent
> >   > > checkpoints will miss this information.
> >   > > Esp. Legacy sources problem:
> >   > > https://issues.apache.org/jira/browse/FLINK-21080
> >   > >
> >   > > Best,
> >   > >
> >   > > Dawid
> >   > > On 19/07/2021 15:10, Yun Gao wrote:
> >   > >
> >   > > Hi Till, Dawid,
> >   > >
> >   > > Very thanks for the comments and discussion! Glad that it seems we
> have
> >   > > come to a convergence, and I also agree with that we could not
> include
> >   > the
> >   > > optimization in the first version.
> >   > >
> >   > > Best
> >   > > Yun
> >   > >
> >   > > ------------------------------------------------------------------
> >   > > From:Dawid Wysakowicz <dwysakow...@apache.org> <
> dwysakow...@apache.org>
> >   > > Send Time:2021 Jul. 19 (Mon.) 17:51
> >   > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till
> Rohrmann
> >   > > <trohrm...@apache.org> <trohrm...@apache.org>
> >   > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao
> >   > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid>
> >   > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> >   > > Finished
> >   > >
> >   > > Small correction. I meant we need to adjust the EndOfInputEvent of
> >   > course.
> >   > >
> >   > > Best,
> >   > >
> >   > > Dawid
> >   > >
> >   > > On 19/07/2021 11:48, Dawid Wysakowicz wrote:
> >   > > > Hey Till,
> >   > > >
> >   > > > Yes, you're right we will have to adjust the current state of
> >   > > > EndOfPartitionEvent and move the moment when we emit it to have
> what
> >   > > > we're discussing here. We are aware of that.
> >   > > >
> >   > > > As for the MAX_WATERMARK vs finish(). My take is that we should
> always
> >   > > > emit MAX_WATERMARK before calling finish() on an operator. At
> the same
> >   > > > time finish() should not leave behind anything in state, as the
> >   > > > intention is that we never restore from the taken
> savepoint/checkpoint
> >   > > > (savepoint w drain or bounded data consumed).
> >   > > >
> >   > > > Best,
> >   > > >
> >   > > > Dawid
> >   > > >
> >   > > > On 19/07/2021 11:33, Till Rohrmann wrote:
> >   > > >> Hi Yun and Dawid,
> >   > > >>
> >   > >
> >   > > >> Thanks for your comments. I do agree with your comments that
> finish()
> >   > can
> >   > > >> do more than MAX_WATERMARK. I guess we should then explain how
> >   > > >> MAX_WATERMARK and finish() play together and what kind of
> >   > > >> order guarantees we provide.
> >   > > >>
> >   > >
> >   > > >> Concerning the EndOfPartitionEvent, I am not entirely sure
> whether it
> >   > would
> >   > >
> >   > > >> work in its current state because we send this event when the
> Task is
> >   > about
> >   > > >> to shut down if I am not mistaken. What we want to have is to
> bring
> >   > the
> >   > >
> >   > > >> StreamTasks into a state so that they shut down on the next
> >   > checkpoint. For
> >   > > >> this we need to keep the StreamTask running. In general, I am a
> fan of
> >   > > >> making things explicit if possible. I think this helps
> maintenance and
> >   > >
> >   > > >> evolvability of code. That's why I think sending an
> EndOfInputEvent
> >   > which
> >   > > >> is a StreamTask level event and which says that there won't be
> any
> >   > other
> >   > > >> records coming only control events could make sense.
> >   > > >>
> >   > > >> I would leave the proposed optimization out of the first
> version. We
> >   > can
> >   > > >> still add it at a later point in time.
> >   > > >>
> >   > > >> Cheers,
> >   > > >> Till
> >   > > >>
> >   > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz
> >   > > <dwysakow...@apache.org> <dwysakow...@apache.org>
> >   > > >> wrote:
> >   > > >>
> >   > >
> >   > > >>> Personally I don't find this optimization important and I'd
> rather
> >   > leave
> >   > >
> >   > > >>> it out not to complicate the codebase further. I doubt we save
> much
> >   > there.
> >   > > >>> I don't have a strong opinion though.
> >   > > >>>
> >   > > >>> Best,
> >   > > >>>
> >   > > >>> Dawid
> >   > > >>> On 19/07/2021 10:31, Yun Gao wrote:
> >   > > >>>
> >   > > >>> Hi,
> >   > > >>>
> >   > > >>> Very thanks Dawid for the thoughts!
> >   > > >>>
> >   > > >>> Currently I also do not have different opinions regarding this
> part.
> >   > > >>> But I have one more issue to confirm: during the previous
> discussion
> >   > we
> >   > > >>> have discussed that for the final checkpoint case, we might
> have an
> >   > > >>> optmization
> >   > >
> >   > > >>> that if a task do not have operators using 2-pc, we might skip
> >   > waiting for
> >   > > >>> the
> >   > >
> >   > > >>> final checkpoint (but we could not skip the savepoint). To
> allow
> >   > users to
> >   > > >>> express
> >   > > >>> the logic, we have proposed to add one more method to
> StreamOperator
> >   > &
> >   > > >>> CheckpointListener:
> >   > > >>>
> >   > > >>> interface StreamOperator {
> >   > > >>>     default boolean requiresFinalCheckpoint() {
> >   > > >>>         return true;
> >   > > >>>     }
> >   > > >>> }
> >   > > >>>
> >   > > >>> interface CheckpointListener {
> >   > > >>>
> >   > > >>>     default boolean requiresFinalCheckpoint() {
> >   > > >>>         return true;
> >   > > >>>     }
> >   > > >>> }
> >   > > >>>
> >   > > >>> class AbstractUdfStreamOperator {
> >   > > >>>
> >   > > >>>     @Override
> >   > > >>>     boolean requiresFinalCheckpoint() {
> >   > > >>>         return userFunction instanceof CheckpointListener &&
> >   > >
> >   > > >>>             ((CheckpointListener)
> >   > userFunction).requiresFinalCheckpoint();
> >   > > >>>     }
> >   > > >>> }
> >   > > >>>
> >   > > >>>
> >   > > >>> I think we should still keep the change ?
> >   > > >>>
> >   > > >>> Best,
> >   > > >>> Yun
> >   > > >>>
> >   > > >>> ------------------Original Mail ------------------
> >   > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
> >   > > <dwysakow...@apache.org>
> >   > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org>
> >   > > >>> *Send Date:*Sun Jul 18 18:44:50 2021
> >   > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> <
> dev@flink.apache.org>
> >   > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun
> >   > > >>> Gao <yungao...@aliyun.com.INVALID>
> <yungao...@aliyun.com.INVALID>
> >   > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
> >   > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
> After Tasks
> >   > > >>> Finished
> >   > > >>>
> >   > > >>>> I think we're all really close to the same solution.
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for
> time based
> >   > > >>>>
> >   > >
> >   > > >>>> buffering, but it does not solve flushing other operations
> such as
> >   > e.g.
> >   > > >>>>
> >   > > >>>> count windows or batching requests in Sinks. I'd prefer to
> treat the
> >   > > >>>>
> >   > > >>>> finish() as a message for Operator to "flush all records". The
> >   > > >>>>
> >   > >
> >   > > >>>> MAX_WATERMARK in my opinion is mostly for backwards
> compatibility
> >   > imo. I
> >   > > >>>>
> >   > > >>>> don't think operators need to get a signal "stop-processing"
> if they
> >   > > >>>>
> >   > > >>>> don't need to flush records. The "WHEN" records are emitted,
> should
> >   > be
> >   > > >>>>
> >   > > >>>> in control of the StreamTask, by firing timers or by
> processing a
> >   > next
> >   > > >>>>
> >   > > >>>> record from upstream.
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > >
> >   > > >>>> The only difference of my previous proposal compared to Yun's
> is
> >   > that I
> >   > > >>>>
> >   > > >>>> did not want to send the EndOfUserRecords event in case of
> stop w/o
> >   > > >>>>
> >   > > >>>> drain. My thinking was that we could directly go from RUNNING
> to
> >   > > >>>>
> >   > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could
> emit
> >   > > >>>>
> >   > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop
> firing
> >   > > >>>>
> >   > >
> >   > > >>>> timers and processing events (without calling finish() on
> >   > Operator). In
> >   > > >>>>
> >   > > >>>> my initial suggestion I though we don't care about some events
> >   > > >>>>
> >   > > >>>> potentially being emitted after the savepoint was taken, as
> they
> >   > would
> >   > > >>>>
> >   > >
> >   > > >>>> anyway belong to the next after FINAL, which would be
> discarded. I
> >   > think
> >   > > >>>>
> >   > > >>>> though the proposal to suspend records processing and timers
> is a
> >   > > >>>>
> >   > >
> >   > > >>>> sensible thing to do and would go with the version that Yun
> put
> >   > into the
> >   > > >>>>
> >   > > >>>> FLIP Wiki.
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>> What do you think Till?
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>> Best,
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>> Dawid
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>>
> >   > > >>>> On 16/07/2021 10:03, Yun Gao wrote:
> >   > > >>>>
> >   > > >>>>> Hi Till, Piotr
> >   > > >>>>> Very thanks for the comments!
> >   > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >   > >
> >   > > >>>>> I also agree with Piotr that currently they are independent
> >   > mechanisms,
> >   > > >>>> and they are basically the same
> >   > > >>>>
> >   > > >>>>> for the event time.
> >   > > >>>>> For more details, first there are some difference among the
> three
> >   > > >>>> scenarios regarding the finish:
> >   > > >>>>
> >   > >
> >   > > >>>>> For normal finish and stop-with-savepoint --drain, the job
> would
> >   > not be
> >   > > >>>> expected to be restarted,
> >   > > >>>>
> >   > > >>>>> and for stop-with-savepoint the job would be expected restart
> >   > later.
> >   > > >>>>> Then for finish / stop-with-savepoint --drain, currently
> Flink
> >   > would
> >   > > >>>> emit MAX_WATERMARK before the
> >   > > >>>>
> >   > >
> >   > > >>>>> EndOfPartition. Besides, as we have discussed before [1],
> >   > endOfInput /
> >   > > >>>> finish() should also only be called
> >   > > >>>>
> >   > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they
> >   > always
> >   > > >>>> occurs at the same time. After the change,
> >   > > >>>>
> >   > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the
> finish
> >   > /
> >   > > >>>> stop-with-savepoint --drain cases.
> >   > > >>>>
> >   > > >>>>>> 2) StreamOperator.finish says to flush all buffered events.
> Would
> >   > a
> >   > > >>>>>> WindowOperator close all windows and emit the results upon
> calling
> >   > > >>>>>> finish, for example?
> >   > > >>>>> As discussed above for stop-with-savepoint, we would always
> keep
> >   > the
> >   > > >>>> window as is, and restore them after restart.
> >   > > >>>>
> >   > > >>>>> Then for the finish / stop-with-savepoint --drain, I think
> perhaps
> >   > it
> >   > > >>>> depends on the Triggers. For
> >   > > >>>>
> >   > >
> >   > > >>>>> event-time triggers / process time triggers, it would be
> >   > reasonable to
> >   > > >>>> flush all the windows since logically
> >   > > >>>>
> >   > >
> >   > > >>>>> the time would always elapse and the window would always get
> >   > triggered
> >   > > >>>> in a logical future. But for triggers
> >   > > >>>>
> >   > >
> >   > > >>>>> like CountTrigger, no matter how much time pass logically,
> the
> >   > windows
> >   > > >>>> would not trigger, thus we may not
> >   > > >>>>
> >   > > >>>>> flush these windows. If there are requirements we may provide
> >   > > >>>> additional triggers.
> >   > > >>>>
> >   > >
> >   > > >>>>>> It's a bit messy and I'm not sure if this should be
> strengthened
> >   > out?
> >   > > >>>> Each one of those has a little bit different semantic/meaning,
> >   > > >>>>
> >   > >
> >   > > >>>>>> but at the same time they are very similar. For single input
> >   > operators
> >   > > >>>> `endInput()` and `finish()` are actually the very same thing.
> >   > > >>>>
> >   > >
> >   > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always
> happen
> >   > at the
> >   > > >>>> same time, and for single input operators `endInput()` and
> >   > `finish()`
> >   > > >>>>
> >   > >
> >   > > >>>>> are indeed the same thing. During the last discussion we ever
> >   > mentioned
> >   > > >>>> this issue and at then we thought that we might deprecate
> >   > `endInput()`
> >   > > >>>>
> >   > > >>>>> in the future, then we would only have endInput(int input)
> and
> >   > > >>>> finish().
> >   > > >>>>
> >   > > >>>>> Best,
> >   > > >>>>> Yun
> >   > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132
> >   > > >>>>>
> ------------------------------------------------------------------
> >   > > >>>>> From:Piotr Nowojski
> >   > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48
> >   > > >>>>> To:dev
> >   > > >>>>> Cc:Yun Gao
> >   > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
> After Tasks
> >   > > >>>> Finished
> >   > > >>>>
> >   > > >>>>> Hi Till,
> >   > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >   > > >>>>>> 2) StreamOperator.finish says to flush all buffered events.
> Would
> >   > a>
> >   > > >>>> WindowOperator close all windows and emit the results upon
> calling
> >   > > >>>>
> >   > > >>>>>> finish, for example?
> >   > > >>>>> 1) currently they are independent but parallel mechanisms.
> With
> >   > event
> >   > > >>>> time, they are basically the same.
> >   > > >>>>
> >   > > >>>>> 2) it probably should for the sake of processing time
> windows.
> >   > > >>>>> Here you are touching the bit of the current design that I
> like the
> >   > >
> >   > > >>>> least. We basically have now three different ways of
> conveying very
> >   > similar
> >   > > >>>> things:
> >   > > >>>>
> >   > > >>>>> a) sending `MAX_WATERMARK`, used by event time
> WindowOperator (what
> >   > > >>>> about processing time?)
> >   > > >>>>
> >   > > >>>>> b) endInput(), used for example by AsyncWaitOperator to
> flush it's
> >   > > >>>> internal state
> >   > > >>>>
> >   > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator
> >   > > >>>>> It's a bit messy and I'm not sure if this should be
> strengthened
> >   > out?
> >   > >
> >   > > >>>> Each one of those has a little bit different
> semantic/meaning, but
> >   > at the
> >   > >
> >   > > >>>> same time they are very similar. For single input operators
> >   > `endInput()`
> >   > > >>>> and `finish()` are actually the very same thing.
> >   > > >>>>
> >   > > >>>>> Piotrek
> >   > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a):
> >   > > >>>>> Thanks for updating the FLIP. Based on the new section about
> >   > > >>>>> stop-with-savepoint [--drain] I got two other questions:
> >   > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >   > > >>>>> 2) StreamOperator.finish says to flush all buffered events.
> Would a
> >   > > >>>>> WindowOperator close all windows and emit the results upon
> calling
> >   > > >>>>> finish, for example?
> >   > > >>>>> Cheers,
> >   > > >>>>> Till
> >   > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
> >   > > >>>>>> Thanks a lot for your answers and clarifications Yun.
> >   > > >>>>>> 1+2) Agreed, this can be a future improvement if this
> becomes a
> >   > > >>>> problem.
> >   > > >>>>
> >   > > >>>>>> 3) Great, this will help a lot with understanding the FLIP.
> >   > > >>>>>> Cheers,
> >   > > >>>>>> Till
> >   > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
> >   > > >>>>>> wrote:
> >   > > >>>>>>> Hi Till,
> >   > > >>>>>>> Very thanks for the review and comments!
> >   > > >>>>>>> 1) First I think in fact we could be able to do the
> computation
> >   > > >>>> outside
> >   > > >>>>
> >   > > >>>>>>> of the main thread,
> >   > > >>>>>>> and the current implementation mainly due to the
> computation is
> >   > in
> >   > > >>>>>>> general fast and we
> >   > > >>>>>>> initially want to have a simplified first version.
> >   > >
> >   > > >>>>>>> The main requirement here is to have a constant view of the
> >   > state of
> >   > > >>>> the
> >   > > >>>>
> >   > > >>>>>>> tasks, otherwise
> >   > > >>>>>>> for example if we have A -> B, if A is running when we
> check if
> >   > we
> >   > > >>>> need
> >   > > >>>>
> >   > > >>>>>>> to trigger A, we will
> >   > > >>>>>>> mark A as have to trigger, but if A gets to finished when
> we
> >   > check
> >   > > >>>> B, we
> >   > > >>>>
> >   > > >>>>>>> will also mark B as
> >   > >
> >   > > >>>>>>> have to trigger, then B will receive both rpc trigger and
> >   > checkpoint
> >   > > >>>>>>> barrier, which would break
> >   > > >>>>>>> some assumption on the task side and complicate the
> >   > implementation.
> >   > >
> >   > > >>>>>>> But to cope this issue, we in fact could first have a
> snapshot
> >   > of the
> >   > > >>>>>>> tasks' state and then do the
> >   > >
> >   > > >>>>>>> computation, both the two step do not need to be in the
> main
> >   > thread.
> >   > >
> >   > > >>>>>>> 2) For the computation logic, in fact currently we benefit
> a lot
> >   > from
> >   > > >>>>>>> some shortcuts on all-to-all
> >   > >
> >   > > >>>>>>> edges and job vertex with all tasks running, these
> shortcuts
> >   > could do
> >   > > >>>>>>> checks on the job vertex level
> >   > > >>>>>>> first and skip some job vertices as a whole. With this
> >   > optimization
> >   > > >>>> we
> >   > > >>>>
> >   > > >>>>>>> have a O(V) algorithm, and the
> >   > > >>>>>>> current running time of the worst case for a job with
> 320,000
> >   > tasks
> >   > > >>>> is
> >   > > >>>>
> >   > > >>>>>>> less than 100ms. For
> >   > > >>>>>>> daily graph sizes the time would be further reduced
> linearly.
> >   > > >>>>>>> If we do the computation based on the last triggered
> tasks, we
> >   > may
> >   > > >>>> not
> >   > > >>>>
> >   > > >>>>>>> easily encode this information
> >   > >
> >   > > >>>>>>> into the shortcuts on the job vertex level. And since the
> time
> >   > seems
> >   > > >>>> to
> >   > > >>>>
> >   > > >>>>>>> be short, perhaps it is enough
> >   > > >>>>>>> to do re-computation from the scratch in consideration of
> the
> >   > > >>>> tradeoff
> >   > > >>>>
> >   > > >>>>>>> between the performance and
> >   > > >>>>>>> the complexity ?
> >   > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after
> the
> >   > > >>>> finish()
> >   > > >>>>
> >   > > >>>>>>> method and before the last
> >   > >
> >   > > >>>>>>> snapshotState() method so that we could shut down the whole
> >   > topology
> >   > > >>>> with
> >   > > >>>>
> >   > > >>>>>>> a single final checkpoint.
> >   > > >>>>>>> Very sorry for not include enough details for this part
> and I'll
> >   > > >>>>>>> complement the FLIP with the details on
> >   > > >>>>>>> the process of the final checkpoint / savepoint.
> >   > > >>>>>>> Best,
> >   > > >>>>>>> Yun
> >   > > >>>>>>>
> >   > ------------------------------------------------------------------
> >   > > >>>>>>> From:Till Rohrmann
> >   > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05
> >   > > >>>>>>> To:dev
> >   > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
> After
> >   > Tasks
> >   > > >>>>>>> Finished
> >   > > >>>>>>> Hi everyone,
> >   > > >>>>>>> I am a bit late to the voting party but let me ask three
> >   > questions:
> >   > >
> >   > > >>>>>>> 1) Why do we execute the trigger plan computation in the
> main
> >   > thread
> >   > > >>>> if we
> >   > > >>>>
> >   > >
> >   > > >>>>>>> cannot guarantee that all tasks are still running when
> >   > triggering the
> >   > > >>>>>>> checkpoint? Couldn't we do the computation in a different
> thread
> >   > in
> >   > > >>>> order
> >   > > >>>>
> >   > > >>>>>>> to relieve the main thread a bit.
> >   > > >>>>>>> 2) The implementation of the
> DefaultCheckpointPlanCalculator
> >   > seems
> >   > > >>>> to go
> >   > > >>>>
> >   > > >>>>>>> over the whole topology for every calculation. Wouldn't it
> be
> >   > more
> >   > > >>>>>>> efficient to maintain the set of current tasks to trigger
> and
> >   > check
> >   > > >>>>>>> whether
> >   > > >>>>>>> anything has changed and if so check the succeeding tasks
> until
> >   > we
> >   > > >>>> have
> >   > > >>>>
> >   > > >>>>>>> found the current checkpoint trigger frontier?
> >   > > >>>>>>> 3) When are we going to send the endOfInput events to a
> >   > downstream
> >   > > >>>> task?
> >   > > >>>>
> >   > > >>>>>>> If
> >   > >
> >   > > >>>>>>> this happens after we call finish on the upstream operator
> but
> >   > before
> >   > > >>>>>>> snapshotState then it would be possible to shut down the
> whole
> >   > > >>>> topology
> >   > > >>>>
> >   > >
> >   > > >>>>>>> with a single final checkpoint. I think this part could
> benefit
> >   > from
> >   > > >>>> a bit
> >   > > >>>>
> >   > > >>>>>>> more detailed description in the FLIP.
> >   > > >>>>>>> Cheers,
> >   > > >>>>>>> Till
> >   > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
> >   > > >>>>>>> wrote:
> >   > > >>>>>>>> Hi there,
> >   > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm
> closing the
> >   > > >>>> vote
> >   > > >>>>
> >   > > >>>>>>> now.
> >   > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1
> votes:
> >   > > >>>>>>>> - Dawid Wysakowicz (binding)
> >   > > >>>>>>>> - Piotr Nowojski(binding)
> >   > > >>>>>>>> - Jiangang Liu (binding)
> >   > > >>>>>>>> - Arvid Heise (binding)
> >   > > >>>>>>>> - Jing Zhang (binding)
> >   > > >>>>>>>> - Leonard Xu (non-binding)
> >   > > >>>>>>>> - Guowei Ma (binding)
> >   > > >>>>>>>> Thus I'm happy to announce that the update to the
> FLIP-147 is
> >   > > >>>> accepted.
> >   > > >>>>
> >   > > >>>>>>>> Very thanks everyone!
> >   > > >>>>>>>> Best,
> >   > > >>>>>>>> Yun
> >   > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
> >   > > >>>>
> >   > > >>>>
> >   > >
> >   > >
> >   >
> >
> >
> >
>
>

Reply via email to