Thanks for the further feedback Zhijiang and Piotr! I think this is a great
feature and will watch the progress. Please also feel free to involve me in
discussions/reviews on state-related part. Thanks.

Best Regards,

On Thu, 27 Feb 2020 at 23:24, Piotr Nowojski <> wrote:

> Hi Yu,
> Re 4.
> Dynamic switching between unaligned and aligned checkpoints based on some
> kind of thresholds (timeout, or checkpoint size) is definitely one of the
> very first improvement that we want to tackle after implementing the MVP.
> Depending on the time constraints, dynamic switching can make to 1.11 or
> not. It’s hard to tell for me at this point of time.
> Piotrek
> > On 26 Feb 2020, at 15:59, Zhijiang <>
> wrote:
> >
> > Thanks for the further explanations, Yu!
> >
> > 1. The inflight buffer spilling process is indeed handled
> asynchronously. While the buffer is not finished spilling, it would not be
> recycled to reuse again.
> > Your understanding is right. I guess I misunderstood your previous
> concern of additional memory consumption from the perspective of buffer
> usage.
> > My point of no additional memory consumption is from the perspective of
> total network memory size which would not be increased as a result.
> >
> > 2. We treat the inflight buffers as input&output states which are
> equivalent with existing operator states, and try to make use of all the
> existing mechanisms for
> > state handle and assignment during recovery. So i guess for the local
> recovery it should be the similar case. I would think through whether it
> has some special
> > work to do around with local recovery, and then clarify it in FLIP after
> we reach an agreement internally. BTW, this FLIP has not finalized yet.
> >
> > 3. Yes, the previous proposal is for measuring how many inflight buffers
> to be spilled which refers to the data size if really taking this way. I
> think the proposed option
> > in FLIP are the initial thoughts for various of possibilities. Which way
> we decide to take for the first version, I guess we need to further
> finalize before voting.
> >
> > 4. I think there probably exists the requirements or scenarios from
> users as you mentioned. Actually we have not finalized the way of switching
> to unaligned checkpoint yet.
> > Anyway we could provide an option for users to try out this feature at
> the beginning, although it might be not the most ideal one. Another input
> is that we know the motivation
> > of unaligned checkpoint is from the scenarios of backpressure, but it
> might also performs well in the case of non backpressure, even shorten the
> checkpoint duration without
> > obvious performance regression in our previous POC testing. So the
> backpressure might not be the only factor to switch to the unaligned way in
> practice I guess. Anyway your
> > inputs are valuable for us to make the final decision.
> >
> > Best,
> > Zhijiang
> >
> >
> >
> >
> > ------------------------------------------------------------------
> > From:Yu Li <>
> > Send Time:2020 Feb. 26 (Wed.) 15:59
> > To:dev <>; Zhijiang <>
> > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi Zhijiang,
> >
> > Thanks for the quick reply!
> >
> > For the 1st question, please allow me to confirm, that when doing
> asynchronous checkpointing, disk spilling should happen in background in
> parallel with receiving/sending new data, or else it would become
> synchronous, right? Based on such assumption, some copy-on-write like
> mechanism would be necessary to make sure the new updates won't modify the
> to-be-checkpointed data, and this is where the additional memory
> consumption comes from.
> >
> > About point #2, I suggest we write it down in the FLIP document about
> local recovery support (if reach a consensus here), to make sure it won't
> be neglected in later implementation (I believe there're still some work to
> do following existing local recovery mechanism). What do you think?
> >
> > For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would
> set some kind of threshold about "how much in-flight data to checkpoint"?
> If so, could you further clarify the measurement (data size? record number?
> others?) since there seems to be no description in the current FLIP doc?
> This is somewhat different from my understanding after reading the FLIP...
> >
> > Regarding question #4, I have no doubt that the new unaligned checkpoint
> mechanism could make fast checkpoint possible, at the cost of more memory,
> network bandwidth and disk space consumption. However, (correct me if I'm
> wrong) for users who are satisfied with the existing aligned checkpoint
> interval, taking the constant cost to prevent delayed checkpoint during
> back pressure - a relatively low frequency event - may not be that
> pragmatic.
> >
> > Best Regards,
> > Yu
> >
> > On Wed, 26 Feb 2020 at 15:07, Zhijiang <>
> wrote:
> > Hi Yu,
> >
> > Thanks for concerning of this FLIP and sharing your thoughts! Let me try
> to answer some below questions.
> >
> > 1. Yes, the asynchronous checkpointing should be part of whole process
> and be supported naturally. As for the network memory concern,
> > the inflight-buffers would be spilled into persistent storage while
> triggering checkpoint, and are recycled to receive/send data after finish
> spilling.
> > We still reuse the current network memory setting, so the maximum
> inflight-buffers would not exceed that amount, and there would not have
> >  additional memory consumption.
> >
> > 2. Yes, we would try to reuse the existing checkpoint recovery mechanism
> for simple implementation.
> >
> UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of
> triggering checkpoint
> > at proper time, the tradeoff between checkpoint duration and spilling
> inflight data, etc. I guess it still makes sense for the single input
> channel.
> >  Assuming there were already accumulated 100 unconsumed buffers in one
> remote input channel when the barrier arrives from the network, then we can
> > decide whether to trigger checkpoint immediately based on
> > if 100 is not reaching its max threshold.
> >
> > 4. I remembered that we ever discussed the options internally before. I
> agree with that the adaptive way might seem more flexible, but also mean
> more complicated
> >  in design and implementation. As the first step of unaligned
> checkpoint, it seems more make sense to take an easy way for only
> concentrating on the function and
> >  practical effect. After getting some feedbacks to convince us, I guess
> the adaptive way might be probably an option to consider if really
> necessary in future.
> >
> > Best,
> > Zhijiang
> >
> >
> > ------------------------------------------------------------------
> > From:Yu Li <>
> > Send Time:2020 Feb. 26 (Wed.) 12:59
> > To:dev <>
> > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi All,
> >
> > Sorry for being late to the discussion. I've gone through the latest FLIP
> > document and have below questions/suggestions:
> >
> > 1. Do we support asynchronous checkpointing on the in-flight data?
> >     * From the doc the answer seems to be yes (state-based storage for
> the
> > first version), and if so, there would be additional memory consumption
> on
> > network buffer during checkpoint and we should take this into account,
> > especially in container environment.
> >
> > 2. I suggest we also take local recovery into consideration during
> > implementation, which could speed up the recovery speed especially when
> the
> > amount of in-flight data is huge.
> >
> > 3. About checkpointing policy, are the below understanding correct? Maybe
> > it helps if we map them more explicitly in FLIP doc, IMHO:
> >     * For single input channel, there's no difference between
> > and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
> > checkpoint once observe the barrier in the input channel.
> >     * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
> > starting checkpoint only when barrier appears in all input channels,
> > while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint
> when
> > barrier appears in any one of the input channels.
> >
> > 4. It seems now we only support pre-defined options, but is it possible
> to
> > switch in between dynamically? For example, if we predefine the policy to
> > ALIGNED, could we supply a command to switch
> > to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed?
> Or
> > switch to ALIGNED if we see too much data persisted for
> > UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> > what's preventing us from being more adaptive?
> >
> > Thanks!
> >
> > Best Regards,
> > Yu
> >
> >
> > On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <>
> wrote:
> >
> >> Hi,
> >>
> >> I would like to propose a modification to this FLIP.
> >>
> >> Based on the feedback that we were receiving after publishing this
> >> document and during Flink Forward, I was growing more and more anxious
> >> about one issue here: having to persist all buffered in-flight data at
> >> once. As the volume of this data might be large (GBs per TaskManager
> even
> >> with small clusters and relatively simple jobs), the time to persist
> all of
> >> this data at once might be quite substantial.
> >>
> >>
> >>
> >> To address this issue, I would like to propose that at first we
> implement
> >> a variant of unaligned checkpoints, just as written down in FLIP-76, but
> >> with continuous spilling - all data will be persisted/spilled
> continuously,
> >> all the time as they come - not at once when the checkpoint starts.
> Think
> >> about this proposal as incremental way of persisting the data.
> >>
> >> Pros of continuous spilling:
> >> + faster checkpointing, as there will be no need to store GBs of data,
> >> just flush/close.
> >> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> loads,
> >> steady records throughput and spilling.
> >>
> >> Cons of continuous spilling:
> >> - need to persist all of the network traffic instead of persisting just
> >> the in-flight data
> >>
> >> Larger volume of persisted data doesn’t matter that much from the
> >> perspective of the throughput, as if you are unable to spill the data
> >> faster than to process them, unaligned checkpoints are worse option
> >> compared to the aligned checkpoints [1]. If checkpoints are frequent it
> >> also doesn’t matter [2]. The true downside is if checkpoints are
> infrequent
> >> and you have to for example pay $ for the extra storage or extra network
> >> traffic to the storage.
> >>
> >> On the other hand, continuous spilling (persistent communication
> >> channels?) might have an added benefit of enabling us localised
> failures -
> >> failure of one node will not necessarily bring down the whole cluster.
> >>
> >>
> >>
> >> As I mentioned, I’m proposing to just start with the continuous
> spilling.
> >> It might be more costly in some scenarios, but it will offer the most
> >> stable and predictable performance with the lowest checkpoint latency.
> It’s
> >> not perfect, it won’t solve all of the use cases, but frankly all of the
> >> other options have their own blind spots, and continuous spilling
> should at
> >> least fully solve relatively low throughput use cases. We can later
> build
> >> on top of that solution, expanding it with the following features:
> >>
> >> 1. Do not spill continuously if there is no backpressure. For example
> >> provide a timeout: start spilling pre-emptively/continuously if some
> buffer
> >> was not processed within X seconds.
> >> 2. Start spilling only once the checkpoint starts (this is the exact
> >> proposal from the current FLIP-76).
> >> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> >> but in the future we are considering other options, for example Apache
> >> Bookeeper.
> >>
> >> What do you think?
> >>
> >> Piotrek
> >>
> >>
> >>
> >> [1] I’m assuming that the spilling throughput per node can go up to
> >> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> >> in-flight data will take 3.3 times longer than waiting for the
> alignment.
> >> On the other hand if data processing rate is 10MB/s, overhead of
> continuous
> >> spilling is relatively low.
> >> [2] With checkpoints every one minute, with data processing throughput
> >> 30MB/s per node, we would have to persist 1.8GB of data per node between
> >> the checkpoints, which is similar order of magnitude as buffered
> in-flight
> >> data under the back-pressure. With higher throughput, unaligned
> checkpoints
> >> are not helping ([1]). With lower throughput, both the original proposal
> >> and continuous spilling would have to effectively persist all of the
> data
> >> anyway.
> >>
> >>> On 10 Oct 2019, at 19:51, Yun Tang <> wrote:
> >>>
> >>> Hi Arvid
> >>>
> >>> +1 for this future which has been hoped for a long time. End-to-end
> >> exactly once job could benefit from quicker checkpoint completion.
> >>>
> >>>
> >>> Best
> >>> Yun Tang
> >>> ________________________________
> >>> From: Yun Gao <>
> >>> Sent: Thursday, October 10, 2019 18:39
> >>> To: dev <>
> >>> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >>>
> >>>   Hi Arvid,
> >>>
> >>>           Very thanks for bring up the discussion! From our side unable
> >> to finish the checkpoint is commonly met for online jobs, therefore +1
> from
> >> my side to implement this.
> >>>          A tiny issue of the FLIP is that the Discussion Thread URL
> >> attached seems to be not right.
> >>>
> >>>
> >>>    Best,
> >>>    Yun
> >>>
> >>>
> >>> ------------------------------------------------------------------
> >>> From:Arvid Heise <>
> >>> Send Time:2019 Sep. 30 (Mon.) 20:31
> >>> To:dev <>
> >>> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> >>>
> >>> Hi Devs,
> >>>
> >>> I would like to start the formal discussion about FLIP-76 [1], which
> >>> improves the checkpoint latency in systems under backpressure, where a
> >>> checkpoint can take hours to complete in the worst case. I recommend
> the
> >>> thread "checkpointing under backpressure" [2] to get a good idea why
> >> users
> >>> are not satisfied with the current behavior. The key points:
> >>>
> >>>  - Since the checkpoint barrier flows much slower through the
> >>>  back-pressured channels, the other channels and their upstream
> >> operators
> >>>  are effectively blocked during checkpointing.
> >>>  - The checkpoint barrier takes a long time to reach the sinks causing
> >>>  long checkpointing times. A longer checkpointing time in turn means
> >> that
> >>>  the checkpoint will be fairly outdated once done. Since a heavily
> >> utilized
> >>>  pipeline is inherently more fragile, we may run into a vicious cycle
> of
> >>>  late checkpoints, crash, recovery to a rather outdated checkpoint,
> more
> >>>  back pressure, and even later checkpoints, which would result in
> >> little to
> >>>  no progress in the application.
> >>>
> >>> The FLIP proposes "unaligned checkpoints" which improves the current
> >> state,
> >>> such that
> >>>
> >>>  - Upstream processes can continue to produce data, even if some
> >> operator
> >>>  still waits on a checkpoint barrier on a specific input channel.
> >>>  - Checkpointing times are heavily reduced across the execution graph,
> >>>  even for operators with a single input channel.
> >>>  - End-users will see more progress even in unstable environments as
> >> more
> >>>  up-to-date checkpoints will avoid too many recomputations.
> >>>  - Facilitate faster rescaling.
> >>>
> >>> The key idea is to allow checkpoint barriers to be forwarded to
> >> downstream
> >>> tasks before the synchronous part of the checkpointing has been
> conducted
> >>> (see Fig. 1). To that end, we need to store in-flight data as part of
> the
> >>> checkpoint as described in greater details in this FLIP.
> >>>
> >>> Although the basic idea was already sketched in [2], we would like get
> >>> broader feedback in this dedicated mail thread.
> >>>
> >>> Best,
> >>>
> >>> Arvid
> >>>
> >>> [1]
> >>>
> >>
> >>> [2]
> >>>
> >>
> >>>
> >>
> >>
> >
> >

Reply via email to