Hi Guowei,

> Thank Dawid and Piotr for sharing the problem. +1 to EndInput/Finish can
be
called repeatedly.

Just to clarify. It's not about calling `finish()` and `endInput()`
repeatedly, but about (from the perspective of operator's state)
1. seeing `finish()`
2. checkpoint X triggered and completed
3. failure + recovery from X
4. potentially processing more records
5. another `finish()`

But from the context of the remaining part of your message Guowei I presume
that you have already got that point :)

Yun:

> For this issue perhaps we could explicitly requires the task to wait for
a checkpoint triggered after finish()
> method is called for all the operators ? We could be able to achieve this
target by maintaining
> some state inside the task.

Isn't this exactly the "WAITING_FOR_FINAL_CP" from the FLIP document? That
we always need to wait for a checkpoint triggered after `finish()` to
complete, before shutting down a task?

What Dawid was describing is a scenario where:
1. task/operator received `finish()`
2. checkpoint 42 triggered (not yet completed)
3. checkpoint 43 triggered (not yet completed)
4. checkpoint 44 triggered (not yet completed)
5. notifyCheckpointComplete(43)

And what should we do now? We can of course commit all transactions until
checkpoint 43. But should we keep waiting for
`notyifyCheckpointComplete(44)`? What if in the meantime another checkpoint
is triggered? We could end up waiting indefinitely.

Our proposal is to shutdown the task immediately after seeing first
`notifyCheckpointComplete(X)`, where X is any triggered checkpoint AFTER
`finish()`. This should be fine, as:
a) ideally there should be no new pending transactions opened after
checkpoint 42
b) even if operator/function is opening some transactions for checkpoint 43
and checkpoint 44 (`FlinkKafkaProducer`), those transactions after
checkpoint 42 should be empty

Hence comment from Dawid
> We must make sure those checkpoints do not leave lingering external
resources.

After seeing 5. (notifyCheckpointComplete(43)) It should be good enough to:
- commit transactions from checkpoint 42, (and 43 if they were created,
depends on the user code)
- close operator, aborting any pending transactions (for checkpoint 44 if
they were opened, depends on the user code)

If checkpoint 44 completes afterwards, it will still be valid. Ideally we
would recommend that after seeing `finish()` operators/functions should not
be opening any new transactions, but that shouldn't be required.

Best,
Piotrek

czw., 22 lip 2021 o 09:00 Yun Gao <yungao...@aliyun.com.invalid> napisał(a):

> Hi Dawid, Piotr, Steven,
>
> Very thanks for pointing out these issues and very thanks for the
> discussion !
>
> Failure before notifyCheckpointComplete()
>
> For this issue I would agree with what Piotr has proposed. I tried to use
> some
> operators like sink / window as example and currently I also do not found
> explicit scenarios that might cause problems if records are processed by a
> task that is assigned with states snapshotted after calling finish()
> before.
> For the future cases it seems users should be able to implement their
> target logic by explicitly add a flag regarding finished, and perhaps have
> different logic if this part of states are referred to. Besides, this case
> would
> only happen on rescaling or topology change, which embedded some kind of
> user
> knowledge inside the action. Thus it looks acceptable that we still split
> the operators
> state from the task lifecycle, and do not treat checkpoint after finish()
> differently.
>
> Finishing upon receiving notifyCheckpointComplete() of not the latest
> checkpoint
>
> For this issue perhaps we could explicitly requires the task to wait for a
> checkpoint triggered after finish()
> method is called for all the operators ? We could be able to achieve this
> target by maintaining
> some state inside the task.
>
> Checkpointing from a single subtask / UnionListState case
>
> This should indeed cause problems, and I also agree with that we could
> focus on this thread in the
> https://issues.apache.org/jira/browse/FLINK-21080.
>
> Best,
> Yun
>
>
> ------------------------------------------------------------------
> From:Piotr Nowojski <pnowoj...@apache.org>
> Send Time:2021 Jul. 22 (Thu.) 02:46
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>; Till Rohrmann <trohrm...@apache.org>;
> Yun Gao <yungao...@aliyun.com.invalid>; Piotr Nowojski <
> pi...@ververica.com>
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
>
> Hi Steven,
>
> > I probably missed sth here. isn't this the case today already? Why is it
> a concern for the proposed change?
>
> The problem is with the newly added `finish()` method and the already
> existing `endInput()` call. Currently on master there are no issues,
> because we are not checkpointing any operators after some operators have
> finished. The purpose of this FLIP-147 is to exactly enable this and this
> opens a new problem described by Dawid.
>
> To paraphrase and to give a concrete example.  Assume we have an operator
> with parallelism of two. Subtask 0 and subtask 1.
>
> 1. Subtask 0 received both `endInput()` and `finish()`, but subtask 1
> hasn't (yet).
> 2. Checkpoint 42 is triggered, and it completes.
> 3. Job fails and is restarted, but at the same time it's rescaled. User
> has chosen to scale down his operator down to 1.
>
> Now we have a pickle. We don't know if `notifyCheckpointComplete(42)` has
> been processed or not, so while recovering to checkpoint 42, we have to
> recover both finished subtask 0 (#1) state and not yet finished subtask 1's
> (#1). But at the same time they are scaled down, so we only have a single
> subtask 0 (#2) that has a combined state from both of the previous
> instances. The potentially confusing issue is that the state from subtask 0
> (#1) was checkpointed AFTER `endInput()` and `finish()` calls, but it's
> recovered to an operator that has still some records to process. In step 1.
> an user for example could store on the operator's state a bit of
> information "end input has been already called!", that after recovery would
> no longer be true.
>
> Hence the question about `finish()` and `endInput()` semantics. Should it
> be tied down to state, or just to an operator instance/execution attempt?
>
> Piotrek
> śr., 21 lip 2021 o 19:00 Steven Wu <stevenz...@gmail.com> napisał(a):
> > if a failure happens after sequence of finish() -> snapshotState(), but
>  before notifyCheckpointComplete(), we will restore such a state and we
>  might end up sending some more records to such an operator.
>
>  I probably missed sth here. isn't this the case today already? Why is it a
>  concern for the proposed change?
>
>  On Wed, Jul 21, 2021 at 4:39 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>  > Hi Dawid,
>  >
>  > Thanks for writing down those concerns.
>  >
>  > I think the first issue boils down what should be the contract of
> lifecycle
>  > methods like open(), close(), initializeState() etc and especially the
> new
>  > additions like finish() and endInput(). And what should be their
> relation
>  > with the operator state (regardless of it's type keyed, non-keyed,
> union,
>  > ...). Should those methods be tied to state or not? After thinking
> about it
>  > for a while (and discussing it offline with Dawid), I think the answer
>  > might be no, they shouldn't. I mean maybe we should just openly say that
>  > all of those methods relate to this single particular instance and
>  > execution of the operator. And if a job is recovered/rescaled, we would
> be
>  > allowed to freely resume consumption, ignoring a fact that maybe some
> parts
>  > of the state have previously seen `endInput()`. Why?
>  >
>  > 0. Yes, it might be confusing. Especially with `endInput()`. We call
>  > `endInput()`, we store something in a state and later after recovery
>  > combined with rescaling that state can see more records? Indeed weird,
>  > 1. I haven't come up yet with a counterexample that would break and make
>  > impossible to implement a real life use case. Theoretically yes, the
> user
>  > can store `endInput()` on state, and after rescaling this state would be
>  > inconsistent with what is actually happening with the operator, but I
>  > haven't found a use case that would break because of that.
>  > 2. Otherwise, implementation would be very difficult.
>  > 3. It's difficult to access keyed state from within
> `endInput()`/`finish()`
>  > calls, as they do not have key context.
>  > 4. After all, openly defining `endInput()` and `finish()` to be tied
> with
>  > it's operator execution instance lifecycle is not that strange and quite
>  > simple to explain. Sure, it can lead to a bit of confusion (0.), but
> that
>  > doesn't sound that bad in comparison with the alternatives that I'm
> aware
>  > of. Also currently methods like `open()` and `close()` are also tied to
> the
>  > operator execution instance, not to the state. Operators can be opened
> and
>  > closed multiple times, it doesn't mean that the state is lost after
> closing
>  > an operator.
>  >
>  > For the UnionListState problem I have posted my proposal in the ticket
> [1],
>  > so maybe let's move that particular discussion there?
>  >
>  > Piotrek
>  >
>  > [1] https://issues.apache.org/jira/browse/FLINK-21080
>  >
>  > śr., 21 lip 2021 o 12:39 Dawid Wysakowicz <dwysakow...@apache.org>
>  > napisał(a):
>  >
>  > > Hey all,
>  > >
>  > > To make the issues that were found transparent to the community, I
> want
>  > to
>  > > post an update:
>  > >
>  > > *1. Committing side-effects*
>  > > We do want to make sure that all side effects are committed before
>  > > bringing tasks down. Side effects are committed when calling
>  > > notifyCheckpointComplete. For the final checkpoint we introduced the
>  > method
>  > > finish(). This notifies the operator that we have consumed all
> incoming
>  > > records and we are preparing to close the Task. In turn we should
> flush
>  > any
>  > > pending buffered records and prepare to commit last transactions. The
>  > goal
>  > > is that after a successful sequence of finish() -> snapshotState() ->
>  > > notifyCheckpointComplete(), the remaining state can be considered
>  > > empty/finished and may be discarded.
>  > >
>  > > *Failure before notifyCheckpointComplete()*
>  > >
>  > > The question is what is the contract of the endInput()/finish()
> methods
>  > > and how do calling these methods affect the operators keyed, non-keyed
>  > > state and external state. Is it allowed to restore state snapshot
> taken
>  > > after calling endInput()/finish() and process more records? Or do we
>  > assume
>  > > that after a restore from such a state taken after finish() we should
> not
>  > > call any of the lifecycle methods or at least make sure those methods
> do
>  > > not emit records/interact with mailbox etc.
>  > >
>  > > Currently it is possible that if a failure happens after sequence of
>  > > finish() -> snapshotState(), but before notifyCheckpointComplete(), we
>  > will
>  > > restore such a state and we might end up sending some more records to
>  > such
>  > > an operator. It is possible if we rescale and this state is merged
> with a
>  > > state of a subtask that has not called finish() yet. It can also
> happen
>  > if
>  > > we rescale the upstream operator and the subtask of interest becomes
>  > > connected to a newly added non finished subtask.
>  > >
>  > > *Snapshotting StreamTasks that finish() has been called*
>  > >
>  > >
>  > > We thought about putting a flag into the snapshot of a subtask
> produced
>  > > after the finish() method. This would make it possible to skip
> execution
>  > of
>  > > certain lifecycle methods. Unfortunately this creates problems for
>  > > rescaling. How do we deal with a situation that subtask states with
> both
>  > > the feature flag set and unset end up in a single StreamTask.
> Additional
>  > > problem is that we merge those states into a single
> OperatorSubtaskState
>  > on
>  > > CheckpointCoordinator.
>  > >
>  > > *Finishing upon receiving notifyCheckpointComplete() of not the latest
>  > > checkpoint*
>  > >
>  > > We need to wait for a checkpoint to complete, that started after the
>  > > finish() method. However, we support concurrent checkpoints therefore,
>  > > there might be later checkpoints that completed, but the notification
> has
>  > > not arrived. We must make sure those checkpoints do not leave
> lingering
>  > > external resources.
>  > >
>  > > *Checkpointing from a single subtask / UnionListState case*
>  > > There are operators that checkpoint from a single subtask only.
> Usually
>  > > from the subtask index=0. If we let those subtasks finish, subsequent
>  > > checkpoints will miss this information.
>  > > Esp. Legacy sources problem:
>  > > https://issues.apache.org/jira/browse/FLINK-21080
>  > >
>  > > Best,
>  > >
>  > > Dawid
>  > > On 19/07/2021 15:10, Yun Gao wrote:
>  > >
>  > > Hi Till, Dawid,
>  > >
>  > > Very thanks for the comments and discussion! Glad that it seems we
> have
>  > > come to a convergence, and I also agree with that we could not include
>  > the
>  > > optimization in the first version.
>  > >
>  > > Best
>  > > Yun
>  > >
>  > > ------------------------------------------------------------------
>  > > From:Dawid Wysakowicz <dwysakow...@apache.org> <
> dwysakow...@apache.org>
>  > > Send Time:2021 Jul. 19 (Mon.) 17:51
>  > > To:dev <dev@flink.apache.org> <dev@flink.apache.org>; Till Rohrmann
>  > > <trohrm...@apache.org> <trohrm...@apache.org>
>  > > Cc:Yun Gao <yungao...@aliyun.com> <yungao...@aliyun.com>; Yun Gao
>  > > <yungao...@aliyun.com.invalid> <yungao...@aliyun.com.invalid>
>  > > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>  > > Finished
>  > >
>  > > Small correction. I meant we need to adjust the EndOfInputEvent of
>  > course.
>  > >
>  > > Best,
>  > >
>  > > Dawid
>  > >
>  > > On 19/07/2021 11:48, Dawid Wysakowicz wrote:
>  > > > Hey Till,
>  > > >
>  > > > Yes, you're right we will have to adjust the current state of
>  > > > EndOfPartitionEvent and move the moment when we emit it to have what
>  > > > we're discussing here. We are aware of that.
>  > > >
>  > > > As for the MAX_WATERMARK vs finish(). My take is that we should
> always
>  > > > emit MAX_WATERMARK before calling finish() on an operator. At the
> same
>  > > > time finish() should not leave behind anything in state, as the
>  > > > intention is that we never restore from the taken
> savepoint/checkpoint
>  > > > (savepoint w drain or bounded data consumed).
>  > > >
>  > > > Best,
>  > > >
>  > > > Dawid
>  > > >
>  > > > On 19/07/2021 11:33, Till Rohrmann wrote:
>  > > >> Hi Yun and Dawid,
>  > > >>
>  > >
>  > > >> Thanks for your comments. I do agree with your comments that
> finish()
>  > can
>  > > >> do more than MAX_WATERMARK. I guess we should then explain how
>  > > >> MAX_WATERMARK and finish() play together and what kind of
>  > > >> order guarantees we provide.
>  > > >>
>  > >
>  > > >> Concerning the EndOfPartitionEvent, I am not entirely sure whether
> it
>  > would
>  > >
>  > > >> work in its current state because we send this event when the Task
> is
>  > about
>  > > >> to shut down if I am not mistaken. What we want to have is to bring
>  > the
>  > >
>  > > >> StreamTasks into a state so that they shut down on the next
>  > checkpoint. For
>  > > >> this we need to keep the StreamTask running. In general, I am a
> fan of
>  > > >> making things explicit if possible. I think this helps maintenance
> and
>  > >
>  > > >> evolvability of code. That's why I think sending an EndOfInputEvent
>  > which
>  > > >> is a StreamTask level event and which says that there won't be any
>  > other
>  > > >> records coming only control events could make sense.
>  > > >>
>  > > >> I would leave the proposed optimization out of the first version.
> We
>  > can
>  > > >> still add it at a later point in time.
>  > > >>
>  > > >> Cheers,
>  > > >> Till
>  > > >>
>  > > >> On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz
>  > > <dwysakow...@apache.org> <dwysakow...@apache.org>
>  > > >> wrote:
>  > > >>
>  > >
>  > > >>> Personally I don't find this optimization important and I'd rather
>  > leave
>  > >
>  > > >>> it out not to complicate the codebase further. I doubt we save
> much
>  > there.
>  > > >>> I don't have a strong opinion though.
>  > > >>>
>  > > >>> Best,
>  > > >>>
>  > > >>> Dawid
>  > > >>> On 19/07/2021 10:31, Yun Gao wrote:
>  > > >>>
>  > > >>> Hi,
>  > > >>>
>  > > >>> Very thanks Dawid for the thoughts!
>  > > >>>
>  > > >>> Currently I also do not have different opinions regarding this
> part.
>  > > >>> But I have one more issue to confirm: during the previous
> discussion
>  > we
>  > > >>> have discussed that for the final checkpoint case, we might have
> an
>  > > >>> optmization
>  > >
>  > > >>> that if a task do not have operators using 2-pc, we might skip
>  > waiting for
>  > > >>> the
>  > >
>  > > >>> final checkpoint (but we could not skip the savepoint). To allow
>  > users to
>  > > >>> express
>  > > >>> the logic, we have proposed to add one more method to
> StreamOperator
>  > &
>  > > >>> CheckpointListener:
>  > > >>>
>  > > >>> interface StreamOperator {
>  > > >>>     default boolean requiresFinalCheckpoint() {
>  > > >>>         return true;
>  > > >>>     }
>  > > >>> }
>  > > >>>
>  > > >>> interface CheckpointListener {
>  > > >>>
>  > > >>>     default boolean requiresFinalCheckpoint() {
>  > > >>>         return true;
>  > > >>>     }
>  > > >>> }
>  > > >>>
>  > > >>> class AbstractUdfStreamOperator {
>  > > >>>
>  > > >>>     @Override
>  > > >>>     boolean requiresFinalCheckpoint() {
>  > > >>>         return userFunction instanceof CheckpointListener &&
>  > >
>  > > >>>             ((CheckpointListener)
>  > userFunction).requiresFinalCheckpoint();
>  > > >>>     }
>  > > >>> }
>  > > >>>
>  > > >>>
>  > > >>> I think we should still keep the change ?
>  > > >>>
>  > > >>> Best,
>  > > >>> Yun
>  > > >>>
>  > > >>> ------------------Original Mail ------------------
>  > > >>> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
>  > > <dwysakow...@apache.org>
>  > > >>> <dwysakow...@apache.org> <dwysakow...@apache.org>
>  > > >>> *Send Date:*Sun Jul 18 18:44:50 2021
>  > > >>> *Recipients:*Flink Dev <dev@flink.apache.org> <
> dev@flink.apache.org>
>  > > <dev@flink.apache.org> <dev@flink.apache.org>, Yun
>  > > >>> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
>  > > <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
>  > > >>> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
> Tasks
>  > > >>> Finished
>  > > >>>
>  > > >>>> I think we're all really close to the same solution.
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>> I second Yun's thoughts that MAX_WATERMARK works well for time
> based
>  > > >>>>
>  > >
>  > > >>>> buffering, but it does not solve flushing other operations such
> as
>  > e.g.
>  > > >>>>
>  > > >>>> count windows or batching requests in Sinks. I'd prefer to treat
> the
>  > > >>>>
>  > > >>>> finish() as a message for Operator to "flush all records". The
>  > > >>>>
>  > >
>  > > >>>> MAX_WATERMARK in my opinion is mostly for backwards compatibility
>  > imo. I
>  > > >>>>
>  > > >>>> don't think operators need to get a signal "stop-processing" if
> they
>  > > >>>>
>  > > >>>> don't need to flush records. The "WHEN" records are emitted,
> should
>  > be
>  > > >>>>
>  > > >>>> in control of the StreamTask, by firing timers or by processing a
>  > next
>  > > >>>>
>  > > >>>> record from upstream.
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > >
>  > > >>>> The only difference of my previous proposal compared to Yun's is
>  > that I
>  > > >>>>
>  > > >>>> did not want to send the EndOfUserRecords event in case of stop
> w/o
>  > > >>>>
>  > > >>>> drain. My thinking was that we could directly go from RUNNING to
>  > > >>>>
>  > > >>>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could
> emit
>  > > >>>>
>  > > >>>> EndOfUserRecordsEvent with an additional flag and e.g. stop
> firing
>  > > >>>>
>  > >
>  > > >>>> timers and processing events (without calling finish() on
>  > Operator). In
>  > > >>>>
>  > > >>>> my initial suggestion I though we don't care about some events
>  > > >>>>
>  > > >>>> potentially being emitted after the savepoint was taken, as they
>  > would
>  > > >>>>
>  > >
>  > > >>>> anyway belong to the next after FINAL, which would be discarded.
> I
>  > think
>  > > >>>>
>  > > >>>> though the proposal to suspend records processing and timers is a
>  > > >>>>
>  > >
>  > > >>>> sensible thing to do and would go with the version that Yun put
>  > into the
>  > > >>>>
>  > > >>>> FLIP Wiki.
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>> What do you think Till?
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>> Best,
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>> Dawid
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>>
>  > > >>>> On 16/07/2021 10:03, Yun Gao wrote:
>  > > >>>>
>  > > >>>>> Hi Till, Piotr
>  > > >>>>> Very thanks for the comments!
>  > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>  > >
>  > > >>>>> I also agree with Piotr that currently they are independent
>  > mechanisms,
>  > > >>>> and they are basically the same
>  > > >>>>
>  > > >>>>> for the event time.
>  > > >>>>> For more details, first there are some difference among the
> three
>  > > >>>> scenarios regarding the finish:
>  > > >>>>
>  > >
>  > > >>>>> For normal finish and stop-with-savepoint --drain, the job would
>  > not be
>  > > >>>> expected to be restarted,
>  > > >>>>
>  > > >>>>> and for stop-with-savepoint the job would be expected restart
>  > later.
>  > > >>>>> Then for finish / stop-with-savepoint --drain, currently Flink
>  > would
>  > > >>>> emit MAX_WATERMARK before the
>  > > >>>>
>  > >
>  > > >>>>> EndOfPartition. Besides, as we have discussed before [1],
>  > endOfInput /
>  > > >>>> finish() should also only be called
>  > > >>>>
>  > > >>>>> for finish / stop-with-savepoint --drain. Thus currently they
>  > always
>  > > >>>> occurs at the same time. After the change,
>  > > >>>>
>  > > >>>>> we could emit MAX_WATERMARK before endOfInput event for the
> finish
>  > /
>  > > >>>> stop-with-savepoint --drain cases.
>  > > >>>>
>  > > >>>>>> 2) StreamOperator.finish says to flush all buffered events.
> Would
>  > a
>  > > >>>>>> WindowOperator close all windows and emit the results upon
> calling
>  > > >>>>>> finish, for example?
>  > > >>>>> As discussed above for stop-with-savepoint, we would always keep
>  > the
>  > > >>>> window as is, and restore them after restart.
>  > > >>>>
>  > > >>>>> Then for the finish / stop-with-savepoint --drain, I think
> perhaps
>  > it
>  > > >>>> depends on the Triggers. For
>  > > >>>>
>  > >
>  > > >>>>> event-time triggers / process time triggers, it would be
>  > reasonable to
>  > > >>>> flush all the windows since logically
>  > > >>>>
>  > >
>  > > >>>>> the time would always elapse and the window would always get
>  > triggered
>  > > >>>> in a logical future. But for triggers
>  > > >>>>
>  > >
>  > > >>>>> like CountTrigger, no matter how much time pass logically, the
>  > windows
>  > > >>>> would not trigger, thus we may not
>  > > >>>>
>  > > >>>>> flush these windows. If there are requirements we may provide
>  > > >>>> additional triggers.
>  > > >>>>
>  > >
>  > > >>>>>> It's a bit messy and I'm not sure if this should be
> strengthened
>  > out?
>  > > >>>> Each one of those has a little bit different semantic/meaning,
>  > > >>>>
>  > >
>  > > >>>>>> but at the same time they are very similar. For single input
>  > operators
>  > > >>>> `endInput()` and `finish()` are actually the very same thing.
>  > > >>>>
>  > >
>  > > >>>>> Currently MAX_WATERMARK / endInput / finish indeed always happen
>  > at the
>  > > >>>> same time, and for single input operators `endInput()` and
>  > `finish()`
>  > > >>>>
>  > >
>  > > >>>>> are indeed the same thing. During the last discussion we ever
>  > mentioned
>  > > >>>> this issue and at then we thought that we might deprecate
>  > `endInput()`
>  > > >>>>
>  > > >>>>> in the future, then we would only have endInput(int input) and
>  > > >>>> finish().
>  > > >>>>
>  > > >>>>> Best,
>  > > >>>>> Yun
>  > > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-21132
>  > > >>>>>
> ------------------------------------------------------------------
>  > > >>>>> From:Piotr Nowojski
>  > > >>>>> Send Time:2021 Jul. 16 (Fri.) 13:48
>  > > >>>>> To:dev
>  > > >>>>> Cc:Yun Gao
>  > > >>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
> Tasks
>  > > >>>> Finished
>  > > >>>>
>  > > >>>>> Hi Till,
>  > > >>>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>  > > >>>>>> 2) StreamOperator.finish says to flush all buffered events.
> Would
>  > a>
>  > > >>>> WindowOperator close all windows and emit the results upon
> calling
>  > > >>>>
>  > > >>>>>> finish, for example?
>  > > >>>>> 1) currently they are independent but parallel mechanisms. With
>  > event
>  > > >>>> time, they are basically the same.
>  > > >>>>
>  > > >>>>> 2) it probably should for the sake of processing time windows.
>  > > >>>>> Here you are touching the bit of the current design that I like
> the
>  > >
>  > > >>>> least. We basically have now three different ways of conveying
> very
>  > similar
>  > > >>>> things:
>  > > >>>>
>  > > >>>>> a) sending `MAX_WATERMARK`, used by event time WindowOperator
> (what
>  > > >>>> about processing time?)
>  > > >>>>
>  > > >>>>> b) endInput(), used for example by AsyncWaitOperator to flush
> it's
>  > > >>>> internal state
>  > > >>>>
>  > > >>>>> c) finish(), used for example by ContinuousFileReaderOperator
>  > > >>>>> It's a bit messy and I'm not sure if this should be strengthened
>  > out?
>  > >
>  > > >>>> Each one of those has a little bit different semantic/meaning,
> but
>  > at the
>  > >
>  > > >>>> same time they are very similar. For single input operators
>  > `endInput()`
>  > > >>>> and `finish()` are actually the very same thing.
>  > > >>>>
>  > > >>>>> Piotrek
>  > > >>>>> czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a):
>  > > >>>>> Thanks for updating the FLIP. Based on the new section about
>  > > >>>>> stop-with-savepoint [--drain] I got two other questions:
>  > > >>>>> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>  > > >>>>> 2) StreamOperator.finish says to flush all buffered events.
> Would a
>  > > >>>>> WindowOperator close all windows and emit the results upon
> calling
>  > > >>>>> finish, for example?
>  > > >>>>> Cheers,
>  > > >>>>> Till
>  > > >>>>> On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
>  > > >>>>>> Thanks a lot for your answers and clarifications Yun.
>  > > >>>>>> 1+2) Agreed, this can be a future improvement if this becomes a
>  > > >>>> problem.
>  > > >>>>
>  > > >>>>>> 3) Great, this will help a lot with understanding the FLIP.
>  > > >>>>>> Cheers,
>  > > >>>>>> Till
>  > > >>>>>> On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
>  > > >>>>>> wrote:
>  > > >>>>>>> Hi Till,
>  > > >>>>>>> Very thanks for the review and comments!
>  > > >>>>>>> 1) First I think in fact we could be able to do the
> computation
>  > > >>>> outside
>  > > >>>>
>  > > >>>>>>> of the main thread,
>  > > >>>>>>> and the current implementation mainly due to the computation
> is
>  > in
>  > > >>>>>>> general fast and we
>  > > >>>>>>> initially want to have a simplified first version.
>  > >
>  > > >>>>>>> The main requirement here is to have a constant view of the
>  > state of
>  > > >>>> the
>  > > >>>>
>  > > >>>>>>> tasks, otherwise
>  > > >>>>>>> for example if we have A -> B, if A is running when we check
> if
>  > we
>  > > >>>> need
>  > > >>>>
>  > > >>>>>>> to trigger A, we will
>  > > >>>>>>> mark A as have to trigger, but if A gets to finished when we
>  > check
>  > > >>>> B, we
>  > > >>>>
>  > > >>>>>>> will also mark B as
>  > >
>  > > >>>>>>> have to trigger, then B will receive both rpc trigger and
>  > checkpoint
>  > > >>>>>>> barrier, which would break
>  > > >>>>>>> some assumption on the task side and complicate the
>  > implementation.
>  > >
>  > > >>>>>>> But to cope this issue, we in fact could first have a snapshot
>  > of the
>  > > >>>>>>> tasks' state and then do the
>  > >
>  > > >>>>>>> computation, both the two step do not need to be in the main
>  > thread.
>  > >
>  > > >>>>>>> 2) For the computation logic, in fact currently we benefit a
> lot
>  > from
>  > > >>>>>>> some shortcuts on all-to-all
>  > >
>  > > >>>>>>> edges and job vertex with all tasks running, these shortcuts
>  > could do
>  > > >>>>>>> checks on the job vertex level
>  > > >>>>>>> first and skip some job vertices as a whole. With this
>  > optimization
>  > > >>>> we
>  > > >>>>
>  > > >>>>>>> have a O(V) algorithm, and the
>  > > >>>>>>> current running time of the worst case for a job with 320,000
>  > tasks
>  > > >>>> is
>  > > >>>>
>  > > >>>>>>> less than 100ms. For
>  > > >>>>>>> daily graph sizes the time would be further reduced linearly.
>  > > >>>>>>> If we do the computation based on the last triggered tasks, we
>  > may
>  > > >>>> not
>  > > >>>>
>  > > >>>>>>> easily encode this information
>  > >
>  > > >>>>>>> into the shortcuts on the job vertex level. And since the time
>  > seems
>  > > >>>> to
>  > > >>>>
>  > > >>>>>>> be short, perhaps it is enough
>  > > >>>>>>> to do re-computation from the scratch in consideration of the
>  > > >>>> tradeoff
>  > > >>>>
>  > > >>>>>>> between the performance and
>  > > >>>>>>> the complexity ?
>  > > >>>>>>> 3) We are going to emit the EndOfInput event exactly after the
>  > > >>>> finish()
>  > > >>>>
>  > > >>>>>>> method and before the last
>  > >
>  > > >>>>>>> snapshotState() method so that we could shut down the whole
>  > topology
>  > > >>>> with
>  > > >>>>
>  > > >>>>>>> a single final checkpoint.
>  > > >>>>>>> Very sorry for not include enough details for this part and
> I'll
>  > > >>>>>>> complement the FLIP with the details on
>  > > >>>>>>> the process of the final checkpoint / savepoint.
>  > > >>>>>>> Best,
>  > > >>>>>>> Yun
>  > > >>>>>>>
>  > ------------------------------------------------------------------
>  > > >>>>>>> From:Till Rohrmann
>  > > >>>>>>> Send Time:2021 Jul. 14 (Wed.) 22:05
>  > > >>>>>>> To:dev
>  > > >>>>>>> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
>  > Tasks
>  > > >>>>>>> Finished
>  > > >>>>>>> Hi everyone,
>  > > >>>>>>> I am a bit late to the voting party but let me ask three
>  > questions:
>  > >
>  > > >>>>>>> 1) Why do we execute the trigger plan computation in the main
>  > thread
>  > > >>>> if we
>  > > >>>>
>  > >
>  > > >>>>>>> cannot guarantee that all tasks are still running when
>  > triggering the
>  > > >>>>>>> checkpoint? Couldn't we do the computation in a different
> thread
>  > in
>  > > >>>> order
>  > > >>>>
>  > > >>>>>>> to relieve the main thread a bit.
>  > > >>>>>>> 2) The implementation of the DefaultCheckpointPlanCalculator
>  > seems
>  > > >>>> to go
>  > > >>>>
>  > > >>>>>>> over the whole topology for every calculation. Wouldn't it be
>  > more
>  > > >>>>>>> efficient to maintain the set of current tasks to trigger and
>  > check
>  > > >>>>>>> whether
>  > > >>>>>>> anything has changed and if so check the succeeding tasks
> until
>  > we
>  > > >>>> have
>  > > >>>>
>  > > >>>>>>> found the current checkpoint trigger frontier?
>  > > >>>>>>> 3) When are we going to send the endOfInput events to a
>  > downstream
>  > > >>>> task?
>  > > >>>>
>  > > >>>>>>> If
>  > >
>  > > >>>>>>> this happens after we call finish on the upstream operator but
>  > before
>  > > >>>>>>> snapshotState then it would be possible to shut down the whole
>  > > >>>> topology
>  > > >>>>
>  > >
>  > > >>>>>>> with a single final checkpoint. I think this part could
> benefit
>  > from
>  > > >>>> a bit
>  > > >>>>
>  > > >>>>>>> more detailed description in the FLIP.
>  > > >>>>>>> Cheers,
>  > > >>>>>>> Till
>  > > >>>>>>> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
>  > > >>>>>>> wrote:
>  > > >>>>>>>> Hi there,
>  > > >>>>>>>> Since the voting time of FLIP-147[1] has passed, I'm closing
> the
>  > > >>>> vote
>  > > >>>>
>  > > >>>>>>> now.
>  > > >>>>>>>> There were seven +1 votes ( 6 / 7 are bindings) and no -1
> votes:
>  > > >>>>>>>> - Dawid Wysakowicz (binding)
>  > > >>>>>>>> - Piotr Nowojski(binding)
>  > > >>>>>>>> - Jiangang Liu (binding)
>  > > >>>>>>>> - Arvid Heise (binding)
>  > > >>>>>>>> - Jing Zhang (binding)
>  > > >>>>>>>> - Leonard Xu (non-binding)
>  > > >>>>>>>> - Guowei Ma (binding)
>  > > >>>>>>>> Thus I'm happy to announce that the update to the FLIP-147 is
>  > > >>>> accepted.
>  > > >>>>
>  > > >>>>>>>> Very thanks everyone!
>  > > >>>>>>>> Best,
>  > > >>>>>>>> Yun
>  > > >>>>>>>> [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>  > > >>>>
>  > > >>>>
>  > >
>  > >
>  >
>
>

Reply via email to