Thanks for your responses Dawid and Yun,

I agree with you that the whole lifecycle section of the
StreamOperator/StreamTask/Task in the FLIP needs to be better fleshed out.
I have to admit that I am a bit confused by the new events you are
mentioning because I couldn't find them in the FLIP. I assume that
EndOfUserRecordsEvent has been introduced to work around the limitations of
unaligned checkpoints and that EndofInputEvent is something that does not
exist yet. Maybe properly writing things down and agreeing on the same
terminology could help our discussion.

I also agree with you that we need some kind of new event (one that is
conceptually bound to the StreamTask) that says that we have reached the
end of input. Let's name this event for the further discussion
EndOfInputEvent.

The way I've thought about the problem is the following: First let us
define two simple state machines for the StreamOperator and the StreamTask:

StreamOperator FSM: RUNNING -> FINISHED -> CLOSED

RUNNING: We process input records and produce output records.
RUNNING -> FINISHED: Happens upon calling StreamOperator.finish() or if the
source reaches the end of data (does not mean that we flush anything). This
also emits EndOfInputEvent to downstream operators
FINISHED: The operator must not output events.
FINISHED -> CLOSED: Happens upon calling StreamOperator.close() (close all
resources)

StreamTask FSM: RUNNING -> WAITING_FOR_FINAL_CHECKPOINT -> CLOSED

RUNNING: The StreamOperator is running.
RUNNING -> WAITING_FOR_FINAL_CHECKPOINT: Upon receiving EndOfInputEvent on
all inputs. This will also call StreamOperator.finish() and send
EndOfInputEvent to its downstream tasks. (Alternatively if the
EndOfInputEvent processing happens in the StreamOperator, then we can
listen to the state transition RUNNING -> FINISHED of the StreamOperator).
WAITING_FOR_FINAL_CHECKPOINT: As the name says we wait for the next
successful checkpoint to complete.
WAITING_FOR_FINAL_CHECKPOINT -> CLOSED: Once the next checkpoint completes
successfully (e.g. StreamTask.notifyCheckpointComplete is called).

So the first thing to note is that StreamOperator.finish() does not flush
anything but simply tells the StreamOperator that it should no longer
produce output. The basic idea would be that whenever we need to flush
our buffered data, then we send a MAX_WATERMARK before sending an
EndOfInputEvent. Of course, we could also integrate this property in the
EndOfInputEvent(boolean flush_buffered_data) but we already have the
watermark mechanism. Of course, we could also change
StreamOperator.finish() into .finish(boolean flush) and translate a
MAX_WATERMARK into .finish(true).

What would it look like if the source reaches its end?

Upon reaching the end, the source would send a MAX_WATERMARK followed by an
EndOfInputEvent and then go into the FINISHED state. This will cause the
StreamTask to go into the WAITING_FOR_FINAL_CHECKPOINT state. The
downstream tasks upon receiving MAX_WATERMARK and EndOfInputEvent on all
channels, would first flush all their data (because of MAX_WATERMARK), then
forward MAX_WATERMARK to its downstream operator, go into the FINISHED
state (because of EndOfInputEvent) and finally also sent an EndOfInputEvent
to its downstream operators. The owning StreamTask would also go into the
WAITING_FOR_FINAL_CHECKPOINT state. And all StreamOperators and StreamTasks
would terminate after the next successful checkpoint. That would then
trigger the EndOfPartitionEvent that will terminate the TCP connections.

What would it look like if we trigger stop-with-savepoint w/ drain?

We would send our sources a signal that they should stop processing and
advance the time. This means that they emit a MAX_WATERMARK and then go
into the FINISHED state (meaning that they send an EndOfInputEvent). Then
everything is the same as with the first case only that we will trigger a
savepoint just after sending the stop processing signal.

What would it look like if we trigger stop-with-savepoint w/o drain?

We would send our sources a signal that they should stop processing. This
means that the sources go into FINISHED which means that we are sending an
EndOfInputEvent. Then everything is the same as with the first case only
that we will trigger a savepoint just after sending the stop processing
signal.

One thing I would like to emphasize here is that the
StreamOperators/StreamTasks don't have to know anything about a specific
savepoint/checkpoint. They simply follow their state machine that says that
you have to wait for a checkpoint if you want to stop. It just happens that
in the case of stop-with-savepoint the next checkpoint will be a savepoint.
That way we decouple the lifecycle management and the stop-with-savepoint
operations a bit better.

Cheers,
Till

On Fri, Jul 16, 2021 at 2:22 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Till,
>
> > Ok, so the plan is that finish() will flush all pending events and then
> send the MAX_WATERMARK.
>
> > What I am missing is the connection between the lifecycle of the
> operator and signals the StreamTask and Task might receive (and also their
> life cycles).
> > At the moment we do have the EndOfPartitionEvent that we send once the
> Task reaches its end. Based on what signals will
> > StreamOperator.finish() be called? What kind of other signals for the
> StreamTask do we have to introduce in order to support stop-with-savepoint
> w/o --drain if we don't want to call finish()?
>
> Currently the plan is to introduce a new event (like EndofInputEvent) to
> first notify all the tasks from sources to
> sink to first call finish() (or skip finish() if stop-with-savepoint),
> then the tasks would wait for the last checkpoint and the finally emit
> EndOfPartition. We could
> add a flag to `EndofInputEvent` to indicating if the finish() method
> would be called.
>
> From the view of a single task, the life cycle of a source task would
> looks like
>
> 1. Open & Initialize
> 2. Emits all the records
> 3. Call finish() for all the operators
> 4. Emit MAX_WATERMARK
> 5. Emit EndofInputEvent {finish = true}
> 6. Wait for the checkpoint trigger, emit CheckpointBarrier and wait for
> the notification of the final checkpoint if needed.
> 7. Call close() for all the operators
> 8. Emit EndOfPartitionEvent
>
>
> for normal finish. For stop-with-savepoint [--drain], it would looks like
>
> 1. Open & Initialize
> 2. Received stop-with-savepoint (--drain) request
> 3. Finish the task (e.g. interrupt the legacy source thread or suspend the
> mailbox for the new source).
> 4. Call finish() for all the operators.
> 5. Emit MAX_WATERMARK if --drain
> 6. Emit EndofInputEvent  {finish = is drain}
> 7. Emit CheckpointBarrier for this savepoint and wait for the
> notification of the final savepoint if needed.
> 8. Call close() for all the operators
> 9. Emit EndOfPartitionEvent
>
> Then the lifecycle of a non-source task would be
>
> 1. Open & Initialize
> 2. runMailboxLoop()
> 3. Received MAX_WATERMARK from all the channels. --> trigger all the
> timers.
> 4. Received EndofInputEvent from all the input channels  --> Call
> finish() for all the operators.
> 5. Emit MAX_WATERMARK if --drain (this is nature with the watermark
> alignment mechanism)
> 6. Emit the received EndofInputEvent.
> 7. Wait for the checkpoint barrier aligned, emit CheckpointBarrier and
> wait for the notification of the final checkpoint / savepoint if needed.
> 8. Call close() for all the operators
> 9. Wait for received all EndOfPartitionEvent, release network resources
> and emit EndOfPartitionEvent
>
> > If finish() sends MAX_WATERMARK after flushing all pending events and
> finish() itself is being triggered by some endOfInputEvent from the
> StreamTask, then we might actually send MAX_WATERMARK multiple times.
>
> I think we might not send MAX_WATERMARK multiple times in that for
> non-source tasks, both MAX_WATERMARK and EndOfInputEvent would be
> processed only after they are received from event input channels.
> Then after process the current task would emit MAX_WATERMARK and 
> EndOfInputEvent
> again to the following tasks, thus both the two events would only be sent
> once for each task ?
>
> Best,
> Yun
>
>
>
> ------------------------------------------------------------------
> From:Till Rohrmann <trohrm...@apache.org>
> Send Time:2021 Jul. 16 (Fri.) 17:26
> To:Yun Gao <yungao...@aliyun.com>
> Cc:dev <dev@flink.apache.org>
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
>
> Ok, so the plan is that finish() will flush all pending events and then
> send the MAX_WATERMARK.
>
> What I am missing is the connection between the lifecycle of the operator
> and signals the StreamTask and Task might receive (and also their life
> cycles). At the moment we do have the EndOfPartitionEvent that we send once
> the Task reaches its end. Based on what signals will
> StreamOperator.finish() be called? What kind of other signals for the
> StreamTask do we have to introduce in order to support stop-with-savepoint
> w/o --drain if we don't want to call finish()?
>
> If finish() sends MAX_WATERMARK after flushing all pending events and
> finish() itself is being triggered by some endOfInputEvent from the
> StreamTask, then we might actually send MAX_WATERMARK multiple times.
>
> I think what I am lacking a bit is how the StreamOperator lifecycle will
> fit together with the lifecycle of its owner and the corresponding signals
> we have to send.
>
> Cheers,
> Till
>
> Cheers,
> Till
>
> On Fri, Jul 16, 2021 at 10:40 AM Yun Gao <yungao...@aliyun.com> wrote:
> Hi Till,
>
> Sorry that I do not see the reply when sending the last mail, I think as a
> whole we are on the same page for the
> 1. For normal stop-with-savepoint, we do not call finish() and do not emit
> MAX_WATERMARK
> 2. For normal finish / stop-with-savepoint --drain, we would call finish()
> and emit MAX_WATERMARK
>
> > But then there is the question, how do we signal the operator that the
> next checkpoint is supposed to stop the operator
> > (how will the operator's lifecycle look in this case)? Maybe we simply
> don't tell the operator and handle this situation on the
> > StreamTask level.
>
> Logically I think in this case UDF seems do not need to know the next
> checkpoint is supposed to stop the operator since the final
> checkpoint in this case have no difference with the ordinary checkpoints.
>
> > So I guess the question is will finish() advance the time to the end or
> is this a separate mechanism (e.g. explicit watermarks).
>
> I tend to have an explicit MAX_WATERMARK since it makes watermark
> processing to be unified with normal cases and make the meanings of
> each event explicit. But this might be a private preference and both
> methods would work.
>
> Very sorry for not making the whole thing clear in the FLIP again, if
> there are no other concerns I'll update the FLIP with the above conclusions
> to make it precise in this part.
>
>
> Best,
> Yun
>
> ------------------------------------------------------------------
> From:Till Rohrmann <trohrm...@apache.org>
> Send Time:2021 Jul. 16 (Fri.) 16:00
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>
> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
>
> I think we should try to sort this out because it might affect how and
> when finish() will be called (or in general how the operator lifecycle
> looks like).
>
> To give an example let's take a look at the stop-with-savepoint w/ and w/o
> --drain:
>
> 1) stop-with-savepoint w/o --drain: Conceptually what we would like to do
> is to stop processing w/o completing any windows or waiting for the
> AsyncOperator to finish its operations. All these unfinished operations
> should become part of the final checkpoint so that we can resume from it at
> a later point. Depending on what finish() does (flush unfinished windows or
> not), this method must or must not be called. Assuming that finish()
> flushes unfinished windows/waits for uncompleted async operations, we
> clearly shouldn't call it. But then there is the question, how do we
> signal the operator that the next checkpoint is supposed to stop the
> operator (how will the operator's lifecycle look in this case)? Maybe we
> simply don't tell the operator and handle this situation on the StreamTask
> level. If finish() does not flush unfinished windows, then it shouldn't be
> a problem.
>
> 2) stop-with-savepoint w/ --drain: Here we want to complete all pending
> operations and flush out all results because we don't intend to resume the
> job. Conceptually, we tell the system that we have reached MAX_WATERMARK.
> If finish() is defined so that it implicitly advances the watermark to
> MAX_WATERMARK, then there is no problem. If finish() does not have this
> semantic, then we need to send the MAX_WATERMARK before sending the
> endOfInput event to a downstream task. In fact, stop-with-savepoint /w
> --drain shouldn't be a lot different from a bounded source that reaches its
> end. It would also send MAX_WATERMARK and then signal the endOfInput event
> (note that endOfInput is decoupled from the event time here).
>
> So I guess the question is will finish() advance the time to the end or is
> this a separate mechanism (e.g. explicit watermarks).
>
> Concerning how to handle processing time, I am a bit unsure tbh. I can see
> arguments for completing processing time windows/firing processing time
> timers when calling stop-with-savepoint w/ --drain. On the other hand, I
> could also see that people want to define actions based on the wall clock
> time that are independent of the stream state and, thus, would want to
> ignore them if the Flink application is stopped before reaching this time.
>
> Cheers,
> Till
>
> On Fri, Jul 16, 2021 at 7:48 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
> Hi Till,
>
> > 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >
> > 2) StreamOperator.finish says to flush all buffered events. Would a
> > WindowOperator close all windows and emit the results upon calling
> > finish, for example?
>
> 1) currently they are independent but parallel mechanisms. With event time,
> they are basically the same.
> 2) it probably should for the sake of processing time windows.
>
> Here you are touching the bit of the current design that I like the least.
> We basically have now three different ways of conveying very similar
> things:
> a) sending `MAX_WATERMARK`, used by event time WindowOperator (what about
> processing time?)
> b) endInput(), used for example by AsyncWaitOperator to flush it's internal
> state
> c) finish(), used for example by ContinuousFileReaderOperator
>
> It's a bit messy and I'm not sure if this should be strengthened out? Each
> one of those has a little bit different semantic/meaning, but at the same
> time they are very similar. For single input operators `endInput()` and
> `finish()` are actually the very same thing.
>
> Piotrek
>
> czw., 15 lip 2021 o 16:47 Till Rohrmann <trohrm...@apache.org> napisaƂ(a):
>
> > Thanks for updating the FLIP. Based on the new section about
> > stop-with-savepoint [--drain] I got two other questions:
> >
> > 1) Does endOfInput entail sending of the MAX_WATERMARK?
> >
> > 2) StreamOperator.finish says to flush all buffered events. Would a
> > WindowOperator close all windows and emit the results upon calling
> > finish, for example?
> >
> > Cheers,
> > Till
> >
> > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> >
> > > Thanks a lot for your answers and clarifications Yun.
> > >
> > > 1+2) Agreed, this can be a future improvement if this becomes a
> problem.
> > >
> > > 3) Great, this will help a lot with understanding the FLIP.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao <yungao...@aliyun.com.invalid>
> > > wrote:
> > >
> > >> Hi Till,
> > >>
> > >> Very thanks for the review and comments!
> > >>
> > >> 1) First I think in fact we could be able to do the computation
> outside
> > >> of the main thread,
> > >> and the current implementation mainly due to the computation is in
> > >> general fast and we
> > >> initially want to have a simplified first version.
> > >>
> > >> The main requirement here is to have a constant view of the state of
> the
> > >> tasks, otherwise
> > >> for example if we have A -> B, if A is running when we check if we
> need
> > >> to trigger A, we will
> > >> mark A as have to trigger, but if A gets to finished when we check B,
> we
> > >> will also mark B as
> > >> have to trigger, then B will receive both rpc trigger and checkpoint
> > >> barrier, which would break
> > >> some assumption on the task side and complicate the implementation.
> > >>
> > >> But to cope this issue, we in fact could first have a snapshot of the
> > >> tasks' state and then do the
> > >> computation, both the two step do not need to be in the main thread.
> > >>
> > >> 2) For the computation logic, in fact currently we benefit a lot from
> > >> some shortcuts on all-to-all
> > >> edges and job vertex with all tasks running, these shortcuts could do
> > >> checks on the job vertex level
> > >> first and skip some job vertices as a whole. With this optimization we
> > >> have a O(V) algorithm, and the
> > >> current running time of the worst case for a job with 320,000 tasks is
> > >> less than 100ms. For
> > >> daily graph sizes the time would be further reduced linearly.
> > >>
> > >> If we do the computation based on the last triggered tasks, we may not
> > >> easily encode this information
> > >> into the shortcuts on the job vertex level. And since the time seems
> to
> > >> be short, perhaps it is enough
> > >> to do re-computation from the scratch in consideration of the tradeoff
> > >> between the performance and
> > >> the complexity ?
> > >>
> > >> 3) We are going to emit the EndOfInput event exactly after the
> finish()
> > >> method and before the last
> > >> snapshotState() method so that we could shut down the whole topology
> > with
> > >> a single final checkpoint.
> > >> Very sorry for not include enough details for this part and I'll
> > >> complement the FLIP with the details on
> > >> the process of the final checkpoint / savepoint.
> > >>
> > >> Best,
> > >> Yun
> > >>
> > >>
> > >>
> > >> ------------------------------------------------------------------
> > >> From:Till Rohrmann <trohrm...@apache.org>
> > >> Send Time:2021 Jul. 14 (Wed.) 22:05
> > >> To:dev <dev@flink.apache.org>
> > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> > >> Finished
> > >>
> > >> Hi everyone,
> > >>
> > >> I am a bit late to the voting party but let me ask three questions:
> > >>
> > >> 1) Why do we execute the trigger plan computation in the main thread
> if
> > we
> > >> cannot guarantee that all tasks are still running when triggering the
> > >> checkpoint? Couldn't we do the computation in a different thread in
> > order
> > >> to relieve the main thread a bit.
> > >>
> > >> 2) The implementation of the DefaultCheckpointPlanCalculator seems to
> go
> > >> over the whole topology for every calculation. Wouldn't it be more
> > >> efficient to maintain the set of current tasks to trigger and check
> > >> whether
> > >> anything has changed and if so check the succeeding tasks until we
> have
> > >> found the current checkpoint trigger frontier?
> > >>
> > >> 3) When are we going to send the endOfInput events to a downstream
> task?
> > >> If
> > >> this happens after we call finish on the upstream operator but before
> > >> snapshotState then it would be possible to shut down the whole
> topology
> > >> with a single final checkpoint. I think this part could benefit from a
> > bit
> > >> more detailed description in the FLIP.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao <yungao...@aliyun.com.invalid>
> > >> wrote:
> > >>
> > >> > Hi there,
> > >> >
> > >> > Since the voting time of FLIP-147[1] has passed, I'm closing the
> vote
> > >> now.
> > >> >
> > >> > There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes:
> > >> >
> > >> > - Dawid Wysakowicz (binding)
> > >> > - Piotr Nowojski(binding)
> > >> > - Jiangang Liu (binding)
> > >> > - Arvid Heise (binding)
> > >> > - Jing Zhang (binding)
> > >> > - Leonard Xu (non-binding)
> > >> > - Guowei Ma (binding)
> > >> >
> > >> > Thus I'm happy to announce that the update to the FLIP-147 is
> > accepted.
> > >> >
> > >> > Very thanks everyone!
> > >> >
> > >> > Best,
> > >> > Yun
> > >> >
> > >> > [1]  https://cwiki.apache.org/confluence/x/mw-ZCQ
> > >>
> > >>
> >
>
>
>

Reply via email to