Personally I don't find this optimization important and I'd rather leave it out not to complicate the codebase further. I doubt we save much there. I don't have a strong opinion though.
Best,
Dawid
On 19/07/2021 10:31, Yun Gao wrote:
> Hi,
>
> Very thanks Dawid for the thoughts!
>
> Currently I also do not have different opinions regarding this part.
> But I have one more issue to confirm: during the previous discussion we
> have discussed that for the final checkpoint case, we might have an
> optmization
> that if a task do not have operators using 2-pc, we might skip waiting
> for the
> final checkpoint (but we could not skip the savepoint). To allow users
> to express
> the logic, we have proposed to add one more method to StreamOperator &
> CheckpointListener:
>
> |interface| |StreamOperator {|
> | ||default| |boolean| |requiresFinalCheckpoint() {|
> | ||return| |true||;|
> | ||}|
> |}|
>
> |interface| |CheckpointListener {|
>
> | ||default| |boolean| |requiresFinalCheckpoint() {|
> | ||return| |true||;|
> | ||}|
> |}|
>
> |class| |AbstractUdfStreamOperator {|
> | |
> | ||@Override|
> | ||boolean| |requiresFinalCheckpoint() {|
> | ||return| |userFunction ||instanceof| |CheckpointListener &&|
> | ||((CheckpointListener) userFunction).requiresFinalCheckpoint();|
> | ||}|
> |}|
> |
> |
>
> I think we should still keep the change ?
>
> Best,
> Yun
>
> ------------------Original Mail ------------------
> *Sender:*Dawid Wysakowicz <[email protected]>
> *Send Date:*Sun Jul 18 18:44:50 2021
> *Recipients:*Flink Dev <[email protected]>, Yun Gao
> <[email protected]>
> *Subject:*Re: [RESULT][VOTE] FLIP-147: Support Checkpoint After
> Tasks Finished
>
> I think we're all really close to the same solution.
>
>
>
> I second Yun's thoughts that MAX_WATERMARK works well for time
> based
>
> buffering, but it does not solve flushing other operations
> such as e.g.
>
> count windows or batching requests in Sinks. I'd prefer to
> treat the
>
> finish() as a message for Operator to "flush all records". The
>
> MAX_WATERMARK in my opinion is mostly for backwards
> compatibility imo. I
>
> don't think operators need to get a signal "stop-processing"
> if they
>
> don't need to flush records. The "WHEN" records are emitted,
> should be
>
> in control of the StreamTask, by firing timers or by
> processing a next
>
> record from upstream.
>
>
>
> The only difference of my previous proposal compared to Yun's
> is that I
>
> did not want to send the EndOfUserRecords event in case of
> stop w/o
>
> drain. My thinking was that we could directly go from RUNNING to
>
> WAITING_FOR_FINAL_CP on EndOfPartitionEvent. I agree we could emit
>
> EndOfUserRecordsEvent with an additional flag and e.g. stop firing
>
> timers and processing events (without calling finish() on
> Operator). In
>
> my initial suggestion I though we don't care about some events
>
> potentially being emitted after the savepoint was taken, as
> they would
>
> anyway belong to the next after FINAL, which would be
> discarded. I think
>
> though the proposal to suspend records processing and timers is a
>
> sensible thing to do and would go with the version that Yun
> put into the
>
> FLIP Wiki.
>
>
>
> What do you think Till?
>
>
>
> Best,
>
>
>
> Dawid
>
>
>
>
>
> On 16/07/2021 10:03, Yun Gao wrote:
>
> > Hi Till, Piotr
>
> >
>
> > Very thanks for the comments!
>
> >
>
> >> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
> > I also agree with Piotr that currently they are independent
> mechanisms, and they are basically the same
>
> > for the event time.
>
> >
>
> > For more details, first there are some difference among the
> three scenarios regarding the finish:
>
> > For normal finish and stop-with-savepoint --drain, the job
> would not be expected to be restarted,
>
> > and for stop-with-savepoint the job would be expected
> restart later.
>
> >
>
> > Then for finish / stop-with-savepoint --drain, currently
> Flink would emit MAX_WATERMARK before the
>
> > EndOfPartition. Besides, as we have discussed before [1],
> endOfInput / finish() should also only be called
>
> > for finish / stop-with-savepoint --drain. Thus currently
> they always occurs at the same time. After the change,
>
> > we could emit MAX_WATERMARK before endOfInput event for the
> finish / stop-with-savepoint --drain cases.
>
> >
>
> >> 2) StreamOperator.finish says to flush all buffered events.
> Would a
>
> >> WindowOperator close all windows and emit the results upon
> calling
>
> >> finish, for example?
>
> > As discussed above for stop-with-savepoint, we would always
> keep the window as is, and restore them after restart.
>
> > Then for the finish / stop-with-savepoint --drain, I think
> perhaps it depends on the Triggers. For
>
> > event-time triggers / process time triggers, it would be
> reasonable to flush all the windows since logically
>
> > the time would always elapse and the window would always get
> triggered in a logical future. But for triggers
>
> > like CountTrigger, no matter how much time pass logically,
> the windows would not trigger, thus we may not
>
> > flush these windows. If there are requirements we may
> provide additional triggers.
>
> >
>
> >> It's a bit messy and I'm not sure if this should be
> strengthened out? Each one of those has a little bit different
> semantic/meaning,
>
> >> but at the same time they are very similar. For single
> input operators `endInput()` and `finish()` are actually the
> very same thing.
>
> > Currently MAX_WATERMARK / endInput / finish indeed always
> happen at the same time, and for single input operators
> `endInput()` and `finish()`
>
> > are indeed the same thing. During the last discussion we
> ever mentioned this issue and at then we thought that we might
> deprecate `endInput()`
>
> > in the future, then we would only have endInput(int input)
> and finish().
>
> >
>
> > Best,
>
> > Yun
>
> >
>
> >
>
> > [1] https://issues.apache.org/jira/browse/FLINK-21132
>
> >
>
> >
>
> >
>
> >
> ------------------------------------------------------------------
>
> > From:Piotr Nowojski
>
> > Send Time:2021 Jul. 16 (Fri.) 13:48
>
> > To:dev
>
> > Cc:Yun Gao
>
> > Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
> After Tasks Finished
>
> >
>
> > Hi Till,
>
> >
>
> >> 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
> >>
>
> >> 2) StreamOperator.finish says to flush all buffered events.
> Would a> WindowOperator close all windows and emit the results
> upon calling
>
> >> finish, for example?
>
> > 1) currently they are independent but parallel mechanisms.
> With event time, they are basically the same.
>
> > 2) it probably should for the sake of processing time windows.
>
> >
>
> > Here you are touching the bit of the current design that I
> like the least. We basically have now three different ways of
> conveying very similar things:
>
> > a) sending `MAX_WATERMARK`, used by event time
> WindowOperator (what about processing time?)
>
> > b) endInput(), used for example by AsyncWaitOperator to
> flush it's internal state
>
> > c) finish(), used for example by ContinuousFileReaderOperator
>
> >
>
> > It's a bit messy and I'm not sure if this should be
> strengthened out? Each one of those has a little bit different
> semantic/meaning, but at the same time they are very similar.
> For single input operators `endInput()` and `finish()` are
> actually the very same thing.
>
> >
>
> > Piotrek
>
> > czw., 15 lip 2021 o 16:47 Till Rohrmann napisał(a):
>
> > Thanks for updating the FLIP. Based on the new section about
>
> > stop-with-savepoint [--drain] I got two other questions:
>
> >
>
> > 1) Does endOfInput entail sending of the MAX_WATERMARK?
>
> >
>
> > 2) StreamOperator.finish says to flush all buffered events.
> Would a
>
> > WindowOperator close all windows and emit the results upon
> calling
>
> > finish, for example?
>
> >
>
> > Cheers,
>
> > Till
>
> >
>
> > On Thu, Jul 15, 2021 at 10:15 AM Till Rohrmann wrote:
>
> >
>
> > > Thanks a lot for your answers and clarifications Yun.
>
> > >
>
> > > 1+2) Agreed, this can be a future improvement if this
> becomes a problem.
>
> > >
>
> > > 3) Great, this will help a lot with understanding the FLIP.
>
> > >
>
> > > Cheers,
>
> > > Till
>
> > >
>
> > > On Wed, Jul 14, 2021 at 5:41 PM Yun Gao
>
> > > wrote:
>
> > >
>
> > >> Hi Till,
>
> > >>
>
> > >> Very thanks for the review and comments!
>
> > >>
>
> > >> 1) First I think in fact we could be able to do the
> computation outside
>
> > >> of the main thread,
>
> > >> and the current implementation mainly due to the
> computation is in
>
> > >> general fast and we
>
> > >> initially want to have a simplified first version.
>
> > >>
>
> > >> The main requirement here is to have a constant view of
> the state of the
>
> > >> tasks, otherwise
>
> > >> for example if we have A -> B, if A is running when we
> check if we need
>
> > >> to trigger A, we will
>
> > >> mark A as have to trigger, but if A gets to finished when
> we check B, we
>
> > >> will also mark B as
>
> > >> have to trigger, then B will receive both rpc trigger and
> checkpoint
>
> > >> barrier, which would break
>
> > >> some assumption on the task side and complicate the
> implementation.
>
> > >>
>
> > >> But to cope this issue, we in fact could first have a
> snapshot of the
>
> > >> tasks' state and then do the
>
> > >> computation, both the two step do not need to be in the
> main thread.
>
> > >>
>
> > >> 2) For the computation logic, in fact currently we
> benefit a lot from
>
> > >> some shortcuts on all-to-all
>
> > >> edges and job vertex with all tasks running, these
> shortcuts could do
>
> > >> checks on the job vertex level
>
> > >> first and skip some job vertices as a whole. With this
> optimization we
>
> > >> have a O(V) algorithm, and the
>
> > >> current running time of the worst case for a job with
> 320,000 tasks is
>
> > >> less than 100ms. For
>
> > >> daily graph sizes the time would be further reduced linearly.
>
> > >>
>
> > >> If we do the computation based on the last triggered
> tasks, we may not
>
> > >> easily encode this information
>
> > >> into the shortcuts on the job vertex level. And since the
> time seems to
>
> > >> be short, perhaps it is enough
>
> > >> to do re-computation from the scratch in consideration of
> the tradeoff
>
> > >> between the performance and
>
> > >> the complexity ?
>
> > >>
>
> > >> 3) We are going to emit the EndOfInput event exactly
> after the finish()
>
> > >> method and before the last
>
> > >> snapshotState() method so that we could shut down the
> whole topology with
>
> > >> a single final checkpoint.
>
> > >> Very sorry for not include enough details for this part
> and I'll
>
> > >> complement the FLIP with the details on
>
> > >> the process of the final checkpoint / savepoint.
>
> > >>
>
> > >> Best,
>
> > >> Yun
>
> > >>
>
> > >>
>
> > >>
>
> > >>
> ------------------------------------------------------------------
>
> > >> From:Till Rohrmann
>
> > >> Send Time:2021 Jul. 14 (Wed.) 22:05
>
> > >> To:dev
>
> > >> Subject:Re: [RESULT][VOTE] FLIP-147: Support Checkpoint
> After Tasks
>
> > >> Finished
>
> > >>
>
> > >> Hi everyone,
>
> > >>
>
> > >> I am a bit late to the voting party but let me ask three
> questions:
>
> > >>
>
> > >> 1) Why do we execute the trigger plan computation in the
> main thread if we
>
> > >> cannot guarantee that all tasks are still running when
> triggering the
>
> > >> checkpoint? Couldn't we do the computation in a different
> thread in order
>
> > >> to relieve the main thread a bit.
>
> > >>
>
> > >> 2) The implementation of the
> DefaultCheckpointPlanCalculator seems to go
>
> > >> over the whole topology for every calculation. Wouldn't
> it be more
>
> > >> efficient to maintain the set of current tasks to trigger
> and check
>
> > >> whether
>
> > >> anything has changed and if so check the succeeding tasks
> until we have
>
> > >> found the current checkpoint trigger frontier?
>
> > >>
>
> > >> 3) When are we going to send the endOfInput events to a
> downstream task?
>
> > >> If
>
> > >> this happens after we call finish on the upstream
> operator but before
>
> > >> snapshotState then it would be possible to shut down the
> whole topology
>
> > >> with a single final checkpoint. I think this part could
> benefit from a bit
>
> > >> more detailed description in the FLIP.
>
> > >>
>
> > >> Cheers,
>
> > >> Till
>
> > >>
>
> > >> On Fri, Jul 2, 2021 at 8:36 AM Yun Gao
>
> > >> wrote:
>
> > >>
>
> > >> > Hi there,
>
> > >> >
>
> > >> > Since the voting time of FLIP-147[1] has passed, I'm
> closing the vote
>
> > >> now.
>
> > >> >
>
> > >> > There were seven +1 votes ( 6 / 7 are bindings) and no
> -1 votes:
>
> > >> >
>
> > >> > - Dawid Wysakowicz (binding)
>
> > >> > - Piotr Nowojski(binding)
>
> > >> > - Jiangang Liu (binding)
>
> > >> > - Arvid Heise (binding)
>
> > >> > - Jing Zhang (binding)
>
> > >> > - Leonard Xu (non-binding)
>
> > >> > - Guowei Ma (binding)
>
> > >> >
>
> > >> > Thus I'm happy to announce that the update to the
> FLIP-147 is accepted.
>
> > >> >
>
> > >> > Very thanks everyone!
>
> > >> >
>
> > >> > Best,
>
> > >> > Yun
>
> > >> >
>
> > >> > [1] https://cwiki.apache.org/confluence/x/mw-ZCQ
>
> > >>
>
> > >>
>
> >
>
>
>
OpenPGP_signature
Description: OpenPGP digital signature
