Hi Yun and Dawid,

Thanks for your comments. I do agree with your comments that finish() can
do more than MAX_WATERMARK. I guess we should then explain how
MAX_WATERMARK and finish() play together and what kind of
order guarantees we provide.

Concerning the EndOfPartitionEvent, I am not entirely sure whether it would
work in its current state because we send this event when the Task is about
to shut down if I am not mistaken. What we want to have is to bring the
StreamTasks into a state so that they shut down on the next checkpoint. For
this we need to keep the StreamTask running. In general, I am a fan of
making things explicit if possible. I think this helps maintenance and
evolvability of code. That's why I think sending an EndOfInputEvent which
is a StreamTask level event and which says that there won't be any other
records coming only control events could make sense.

I would leave the proposed optimization out of the first version. We can
still add it at a later point in time.

Cheers,
Till

On Mon, Jul 19, 2021 at 10:35 AM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Personally I don't find this optimization important and I'd rather leave
> it out not to complicate the codebase further. I doubt we save much there.
> I don't have a strong opinion though.
>
> Best,
>
> Dawid
> On 19/07/2021 10:31, Yun Gao wrote:
>
> Hi,
>
> Very thanks Dawid for the thoughts!
>
> Currently I also do not have different opinions regarding this part.
> But I have one more issue to confirm: during the previous discussion we
> have discussed that for the final checkpoint case, we might have an
> optmization
> that if a task do not have operators using 2-pc, we might skip waiting for
> the
> final checkpoint (but we could not skip the savepoint). To allow users to
> express
> the logic, we have proposed to add one more method to StreamOperator &
> CheckpointListener:
>
> interface StreamOperator {
>     default boolean requiresFinalCheckpoint() {
>         return true;
>     }
> }
>
> interface CheckpointListener {
>
>     default boolean requiresFinalCheckpoint() {
>         return true;
>     }
> }
>
> class AbstractUdfStreamOperator {
>
>     @Override
>     boolean requiresFinalCheckpoint() {
>         return userFunction instanceof CheckpointListener &&
>             ((CheckpointListener) userFunction).requiresFinalCheckpoint();
>     }
> }
>
>
> I think we should still keep the change ?
>
> Best,
> Yun
>
> ------------------Original Mail ------------------
> *Sender:*Dawid Wysakowicz <dwysakow...@apache.org>
> <dwysakow...@apache.org>
> *Send Date:*Sun Jul 18 18:44:50 2021
> *Recipients:*Flink Dev <dev@flink.apache.org> <dev@flink.apache.org>, Yun
> Gao <yungao...@aliyun.com.INVALID> <yungao...@aliyun.com.INVALID>
> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
> Finished
>
>> I think we're all really close to the same solution.
>>
>>
>>
>> I second Yun's thoughts that MAX_WATERMARK works well for time based
>>
>> buffering, but it does not solve flushing other operations such as e.g.
>>
>> count windows or batching requests in Sinks. I'd prefer to treat the
>>
>> finish() as a message for Operator to "flush all records". The
>>
>> MAX_WATERMARK in my opinion is mostly for backwards compatibility imo. I
>>
>> don't think operators need to get a signal "stop-processing" if they
>>
>> don't need to flush records. The "WHEN" records are emitted, should be
>>
>> in control of the StreamTask, by firing timers or by processing a next
>>
>> record from upstream.
>>
>>
>>
>> The only difference of my previous proposal compared to Yun's is that I
>>
>> did not want to send the EndOfUserRecords event in case of stop w/o
>>
>> drain. My thinking was that we could directly go from RUNNING to
>>
>> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit
>>
>> EndOfUserRecordsEvent with an additional flag and e.g. stop firing
>>
>> timers and processing events (without calling finish() on Operator). In
>>
>> my initial suggestion I though we don't care about some events
>>
>> potentially being emitted after the savepoint was taken, as they would
>>
>> anyway belong to the next after FINAL, which would be discarded. I think
>>
>> though the proposal to suspend records processing and timers is a
>>
>> sensible thing to do and would go with the version that Yun put into the
>>
>> FLIP Wiki.
>>
>>
>>
>> What do you think Till?
>>
>>
>>
>> Best,
>>
>>
>>
>> Dawid
>>
>>
>>
>>
>>
>> On 16/07/2021 10:03, Yun Gao wrote:
>>
>> > Hi Till, Piotr
>>
>> >
>>
>> > Very thanks for the comments!
>>
>> >
>>
>> >> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>>
>> > I also agree with Piotr that currently they are independent mechanisms,
>> and they are basically the same
>>
>> > for the event time.
>>
>> >
>>
>> > For more details, first there are some difference among the three
>> scenarios regarding the finish:
>>
>> > For normal finish and stop-with-savepoint --drain, the job would not be
>> expected to be restarted,
>>
>> > and for stop-with-savepoint the job would be expected restart later.
>>
>> >
>>
>> > Then for finish / stop-with-savepoint --drain, currently Flink would
>> emit MAX_WATERMARK before the
>>
>> > EndOfPartition. Besides, as we have discussed before [1], endOfInput /
>> finish() should also only be called
>>
>> > for finish / stop-with-savepoint --drain. Thus currently they always
>> occurs at the same time. After the change,
>>
>> > we could emit MAX_WATERMARK before endOfInput event for the finish /
>> stop-with-savepoint --drain cases.
>>
>> >
>>
>> >> 2) StreamOperator.finish says to flush all buffered events. Would a
>>
>> >> WindowOperator close all windows and emit the results upon calling
>>
>> >> finish, for example?
>>
>> > As discussed above for stop-with-savepoint, we would always keep the
>> window as is, and restore them after restart.
>>
>> > Then for the finish / stop-with-savepoint --drain, I think perhaps it
>> depends on the Triggers. For
>>
>> > event-time triggers / process time triggers, it would be reasonable to
>> flush all the windows since logically
>>
>> > the time would always elapse and the window would always get triggered
>> in a logical future. But for triggers
>>
>> > like CountTrigger, no matter how much time pass logically, the windows
>> would not trigger, thus we may not
>>
>> > flush these windows. If there are requirements we may provide
>> additional triggers.
>>
>> >
>>
>> >> It's a bit messy and I'm not sure if this should be strengthened out?
>> Each one of those has a little bit different semantic/meaning,
>>
>> >> but at the same time they are very similar. For single input operators
>> `endInput()` and `finish()` are actually the very same thing.
>>
>> > Currently MAX_WATERMARK / endInput / finish indeed always happen at the
>> same time, and for single input operators `endInput()` and `finish()`
>>
>> > are indeed the same thing. During the last discussion we ever mentioned
>> this issue and at then we thought that we might deprecate `endInput()`
>>
>> > in the future, then we would only have endInput(int input) and
>> finish().
>>
>> >
>>
>> > Best,
>>
>> > Yun
>>
>> >
>>
>> >
>>
>> > [1] https://issues.apache.org/jira/browse/FLINK-21132
>>
>> >
>>
>> >
>>
>> >
>>
>> > ------------------------------------------------------------------
>>
>> > From:Piotr Nowojski
>>
>> > Send Time:2021 Jul. 16 (Fri.) 13:48
>>
>> > To:dev
>>
>> > Cc:Yun Gao
>>
>> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>> Finished
>>
>> >
>>
>> > Hi Till,
>>
>> >
>>
>> >> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>>
>> >>
>>
>> >> 2) StreamOperator.finish says to flush all buffered events. Would a>
>> WindowOperator close all windows and emit the results upon calling
>>
>> >> finish, for example?
>>
>> > 1) currently they are independent but parallel mechanisms. With event
>> time, they are basically the same.
>>
>> > 2) it probably should for the sake of processing time windows.
>>
>> >
>>
>> > Here you are touching the bit of the current design that I like the
>> least. We basically have now three different ways of conveying very similar
>> things:
>>
>> > a) sending `MAX_WATERMARK`, used by event time WindowOperator (what
>> about processing time?)
>>
>> > b) endInput(), used for example by AsyncWaitOperator to flush it's
>> internal state
>>
>> > c) finish(), used for example by ContinuousFileReaderOperator
>>
>> >
>>
>> > It's a bit messy and I'm not sure if this should be strengthened out?
>> Each one of those has a little bit different semantic/meaning, but at the
>> same time they are very similar. For single input operators `endInput()`
>> and `finish()` are actually the very same thing.
>>
>> >
>>
>> > Piotrek
>>
>> > czw., 15 lip 2021 o 16:47 Till Rohrmann napisaƂ(a):
>>
>> > Thanks for updating the FLIP. Based on the new section about
>>
>> > stop-with-savepoint [--drain] I got two other questions:
>>
>> >
>>
>> > 1) Does endOfInput entail sending of the MAX_WATERMARK?
>>
>> >
>>
>> > 2) StreamOperator.finish says to flush all buffered events. Would a
>>
>> > WindowOperator close all windows and emit the results upon calling
>>
>> > finish, for example?
>>
>> >
>>
>> > Cheers,
>>
>> > Till
>>
>> >
>>
>> > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
>>
>> >
>>
>> > > Thanks a lot for your answers and clarifications Yun.
>>
>> > >
>>
>> > > 1+2) Agreed, this can be a future improvement if this becomes a
>> problem.
>>
>> > >
>>
>> > > 3) Great, this will help a lot with understanding the FLIP.
>>
>> > >
>>
>> > > Cheers,
>>
>> > > Till
>>
>> > >
>>
>> > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
>>
>> > > wrote:
>>
>> > >
>>
>> > >> Hi Till,
>>
>> > >>
>>
>> > >> Very thanks for the review and comments!
>>
>> > >>
>>
>> > >> 1) First I think in fact we could be able to do the computation
>> outside
>>
>> > >> of the main thread,
>>
>> > >> and the current implementation mainly due to the computation is in
>>
>> > >> general fast and we
>>
>> > >> initially want to have a simplified first version.
>>
>> > >>
>>
>> > >> The main requirement here is to have a constant view of the state of
>> the
>>
>> > >> tasks, otherwise
>>
>> > >> for example if we have A -> B, if A is running when we check if we
>> need
>>
>> > >> to trigger A, we will
>>
>> > >> mark A as have to trigger, but if A gets to finished when we check
>> B, we
>>
>> > >> will also mark B as
>>
>> > >> have to trigger, then B will receive both rpc trigger and checkpoint
>>
>> > >> barrier, which would break
>>
>> > >> some assumption on the task side and complicate the implementation.
>>
>> > >>
>>
>> > >> But to cope this issue, we in fact could first have a snapshot of the
>>
>> > >> tasks' state and then do the
>>
>> > >> computation, both the two step do not need to be in the main thread.
>>
>> > >>
>>
>> > >> 2) For the computation logic, in fact currently we benefit a lot from
>>
>> > >> some shortcuts on all-to-all
>>
>> > >> edges and job vertex with all tasks running, these shortcuts could do
>>
>> > >> checks on the job vertex level
>>
>> > >> first and skip some job vertices as a whole. With this optimization
>> we
>>
>> > >> have a O(V) algorithm, and the
>>
>> > >> current running time of the worst case for a job with 320,000 tasks
>> is
>>
>> > >> less than 100ms. For
>>
>> > >> daily graph sizes the time would be further reduced linearly.
>>
>> > >>
>>
>> > >> If we do the computation based on the last triggered tasks, we may
>> not
>>
>> > >> easily encode this information
>>
>> > >> into the shortcuts on the job vertex level. And since the time seems
>> to
>>
>> > >> be short, perhaps it is enough
>>
>> > >> to do re-computation from the scratch in consideration of the
>> tradeoff
>>
>> > >> between the performance and
>>
>> > >> the complexity ?
>>
>> > >>
>>
>> > >> 3) We are going to emit the EndOfInput event exactly after the
>> finish()
>>
>> > >> method and before the last
>>
>> > >> snapshotState() method so that we could shut down the whole topology
>> with
>>
>> > >> a single final checkpoint.
>>
>> > >> Very sorry for not include enough details for this part and I'll
>>
>> > >> complement the FLIP with the details on
>>
>> > >> the process of the final checkpoint / savepoint.
>>
>> > >>
>>
>> > >> Best,
>>
>> > >> Yun
>>
>> > >>
>>
>> > >>
>>
>> > >>
>>
>> > >> ------------------------------------------------------------------
>>
>> > >> From:Till Rohrmann
>>
>> > >> Send Time:2021 Jul. 14 (Wed.) 22:05
>>
>> > >> To:dev
>>
>> > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After Tasks
>>
>> > >> Finished
>>
>> > >>
>>
>> > >> Hi everyone,
>>
>> > >>
>>
>> > >> I am a bit late to the voting party but let me ask three questions:
>>
>> > >>
>>
>> > >> 1) Why do we execute the trigger plan computation in the main thread
>> if we
>>
>> > >> cannot guarantee that all tasks are still running when triggering the
>>
>> > >> checkpoint? Couldn't we do the computation in a different thread in
>> order
>>
>> > >> to relieve the main thread a bit.
>>
>> > >>
>>
>> > >> 2) The implementation of the DefaultCheckpointPlanCalculator seems
>> to go
>>
>> > >> over the whole topology for every calculation. Wouldn't it be more
>>
>> > >> efficient to maintain the set of current tasks to trigger and check
>>
>> > >> whether
>>
>> > >> anything has changed and if so check the succeeding tasks until we
>> have
>>
>> > >> found the current checkpoint trigger frontier?
>>
>> > >>
>>
>> > >> 3) When are we going to send the endOfInput events to a downstream
>> task?
>>
>> > >> If
>>
>> > >> this happens after we call finish on the upstream operator but before
>>
>> > >> snapshotState then it would be possible to shut down the whole
>> topology
>>
>> > >> with a single final checkpoint. I think this part could benefit from
>> a bit
>>
>> > >> more detailed description in the FLIP.
>>
>> > >>
>>
>> > >> Cheers,
>>
>> > >> Till
>>
>> > >>
>>
>> > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
>>
>> > >> wrote:
>>
>> > >>
>>
>> > >> > Hi there,
>>
>> > >> >
>>
>> > >> > Since the voting time of FLIP-147[1] has passed, I'm closing the
>> vote
>>
>> > >> now.
>>
>> > >> >
>>
>> > >> > There were seven +1 votes ( 6 / 7 are bindings) and no -1 votes:
>>
>> > >> >
>>
>> > >> > - Dawid Wysakowicz (binding)
>>
>> > >> > - Piotr Nowojski(binding)
>>
>> > >> > - Jiangang Liu (binding)
>>
>> > >> > - Arvid Heise (binding)
>>
>> > >> > - Jing Zhang (binding)
>>
>> > >> > - Leonard Xu (non-binding)
>>
>> > >> > - Guowei Ma (binding)
>>
>> > >> >
>>
>> > >> > Thus I'm happy to announce that the update to the FLIP-147 is
>> accepted.
>>
>> > >> >
>>
>> > >> > Very thanks everyone!
>>
>> > >> >
>>
>> > >> > Best,
>>
>> > >> > Yun
>>
>> > >> >
>>
>> > >> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>>
>> > >>
>>
>> > >>
>>
>> >
>>
>>
>>
>>

Reply via email to