Thanks for the further feedback Zhijiang and Piotr! I think this is a great feature and will watch the progress. Please also feel free to involve me in discussions/reviews on state-related part. Thanks.
Best Regards, Yu On Thu, 27 Feb 2020 at 23:24, Piotr Nowojski <pi...@ververica.com> wrote: > Hi Yu, > > Re 4. > > Dynamic switching between unaligned and aligned checkpoints based on some > kind of thresholds (timeout, or checkpoint size) is definitely one of the > very first improvement that we want to tackle after implementing the MVP. > Depending on the time constraints, dynamic switching can make to 1.11 or > not. It’s hard to tell for me at this point of time. > > Piotrek > > > On 26 Feb 2020, at 15:59, Zhijiang <wangzhijiang...@aliyun.com.INVALID> > wrote: > > > > Thanks for the further explanations, Yu! > > > > 1. The inflight buffer spilling process is indeed handled > asynchronously. While the buffer is not finished spilling, it would not be > recycled to reuse again. > > Your understanding is right. I guess I misunderstood your previous > concern of additional memory consumption from the perspective of buffer > usage. > > My point of no additional memory consumption is from the perspective of > total network memory size which would not be increased as a result. > > > > 2. We treat the inflight buffers as input&output states which are > equivalent with existing operator states, and try to make use of all the > existing mechanisms for > > state handle and assignment during recovery. So i guess for the local > recovery it should be the similar case. I would think through whether it > has some special > > work to do around with local recovery, and then clarify it in FLIP after > we reach an agreement internally. BTW, this FLIP has not finalized yet. > > > > 3. Yes, the previous proposal is for measuring how many inflight buffers > to be spilled which refers to the data size if really taking this way. I > think the proposed option > > in FLIP are the initial thoughts for various of possibilities. Which way > we decide to take for the first version, I guess we need to further > finalize before voting. > > > > 4. I think there probably exists the requirements or scenarios from > users as you mentioned. Actually we have not finalized the way of switching > to unaligned checkpoint yet. > > Anyway we could provide an option for users to try out this feature at > the beginning, although it might be not the most ideal one. Another input > is that we know the motivation > > of unaligned checkpoint is from the scenarios of backpressure, but it > might also performs well in the case of non backpressure, even shorten the > checkpoint duration without > > obvious performance regression in our previous POC testing. So the > backpressure might not be the only factor to switch to the unaligned way in > practice I guess. Anyway your > > inputs are valuable for us to make the final decision. > > > > Best, > > Zhijiang > > > > > > > > > > ------------------------------------------------------------------ > > From:Yu Li <car...@gmail.com> > > Send Time:2020 Feb. 26 (Wed.) 15:59 > > To:dev <dev@flink.apache.org>; Zhijiang <wangzhijiang...@aliyun.com> > > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints > > > > Hi Zhijiang, > > > > Thanks for the quick reply! > > > > For the 1st question, please allow me to confirm, that when doing > asynchronous checkpointing, disk spilling should happen in background in > parallel with receiving/sending new data, or else it would become > synchronous, right? Based on such assumption, some copy-on-write like > mechanism would be necessary to make sure the new updates won't modify the > to-be-checkpointed data, and this is where the additional memory > consumption comes from. > > > > About point #2, I suggest we write it down in the FLIP document about > local recovery support (if reach a consensus here), to make sure it won't > be neglected in later implementation (I believe there're still some work to > do following existing local recovery mechanism). What do you think? > > > > For the 3rd topic, do you mean UNALIGNED_WITH_MAX_INFLIGHT_DATA would > set some kind of threshold about "how much in-flight data to checkpoint"? > If so, could you further clarify the measurement (data size? record number? > others?) since there seems to be no description in the current FLIP doc? > This is somewhat different from my understanding after reading the FLIP... > > > > Regarding question #4, I have no doubt that the new unaligned checkpoint > mechanism could make fast checkpoint possible, at the cost of more memory, > network bandwidth and disk space consumption. However, (correct me if I'm > wrong) for users who are satisfied with the existing aligned checkpoint > interval, taking the constant cost to prevent delayed checkpoint during > back pressure - a relatively low frequency event - may not be that > pragmatic. > > > > Best Regards, > > Yu > > > > On Wed, 26 Feb 2020 at 15:07, Zhijiang <wangzhijiang...@aliyun.com.invalid> > wrote: > > Hi Yu, > > > > Thanks for concerning of this FLIP and sharing your thoughts! Let me try > to answer some below questions. > > > > 1. Yes, the asynchronous checkpointing should be part of whole process > and be supported naturally. As for the network memory concern, > > the inflight-buffers would be spilled into persistent storage while > triggering checkpoint, and are recycled to receive/send data after finish > spilling. > > We still reuse the current network memory setting, so the maximum > inflight-buffers would not exceed that amount, and there would not have > > additional memory consumption. > > > > 2. Yes, we would try to reuse the existing checkpoint recovery mechanism > for simple implementation. > > > > 3. UNALIGNED_WITH_MAX_INFLIGHT_DATA and > UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA are for the consideration of > triggering checkpoint > > at proper time, the tradeoff between checkpoint duration and spilling > inflight data, etc. I guess it still makes sense for the single input > channel. > > Assuming there were already accumulated 100 unconsumed buffers in one > remote input channel when the barrier arrives from the network, then we can > > decide whether to trigger checkpoint immediately based on > UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA or based on > UNALIGNED_WITH_MAX_INFLIGHT_DATA > > if 100 is not reaching its max threshold. > > > > 4. I remembered that we ever discussed the options internally before. I > agree with that the adaptive way might seem more flexible, but also mean > more complicated > > in design and implementation. As the first step of unaligned > checkpoint, it seems more make sense to take an easy way for only > concentrating on the function and > > practical effect. After getting some feedbacks to convince us, I guess > the adaptive way might be probably an option to consider if really > necessary in future. > > > > Best, > > Zhijiang > > > > > > ------------------------------------------------------------------ > > From:Yu Li <car...@gmail.com> > > Send Time:2020 Feb. 26 (Wed.) 12:59 > > To:dev <dev@flink.apache.org> > > Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints > > > > Hi All, > > > > Sorry for being late to the discussion. I've gone through the latest FLIP > > document and have below questions/suggestions: > > > > 1. Do we support asynchronous checkpointing on the in-flight data? > > * From the doc the answer seems to be yes (state-based storage for > the > > first version), and if so, there would be additional memory consumption > on > > network buffer during checkpoint and we should take this into account, > > especially in container environment. > > > > 2. I suggest we also take local recovery into consideration during > > implementation, which could speed up the recovery speed especially when > the > > amount of in-flight data is huge. > > > > 3. About checkpointing policy, are the below understanding correct? Maybe > > it helps if we map them more explicitly in FLIP doc, IMHO: > > * For single input channel, there's no difference between > > UNALIGNED_WITH_MAX_INFLIGHT_DATA > > and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the > > checkpoint once observe the barrier in the input channel. > > * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means > > starting checkpoint only when barrier appears in all input channels, > > while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint > when > > barrier appears in any one of the input channels. > > > > 4. It seems now we only support pre-defined options, but is it possible > to > > switch in between dynamically? For example, if we predefine the policy to > > ALIGNED, could we supply a command to switch > > to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? > Or > > switch to ALIGNED if we see too much data persisted for > > UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but > > what's preventing us from being more adaptive? > > > > Thanks! > > > > Best Regards, > > Yu > > > > > > On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski <pi...@ververica.com> > wrote: > > > >> Hi, > >> > >> I would like to propose a modification to this FLIP. > >> > >> Based on the feedback that we were receiving after publishing this > >> document and during Flink Forward, I was growing more and more anxious > >> about one issue here: having to persist all buffered in-flight data at > >> once. As the volume of this data might be large (GBs per TaskManager > even > >> with small clusters and relatively simple jobs), the time to persist > all of > >> this data at once might be quite substantial. > >> > >> > >> > >> To address this issue, I would like to propose that at first we > implement > >> a variant of unaligned checkpoints, just as written down in FLIP-76, but > >> with continuous spilling - all data will be persisted/spilled > continuously, > >> all the time as they come - not at once when the checkpoint starts. > Think > >> about this proposal as incremental way of persisting the data. > >> > >> Pros of continuous spilling: > >> + faster checkpointing, as there will be no need to store GBs of data, > >> just flush/close. > >> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU > loads, > >> steady records throughput and spilling. > >> > >> Cons of continuous spilling: > >> - need to persist all of the network traffic instead of persisting just > >> the in-flight data > >> > >> Larger volume of persisted data doesn’t matter that much from the > >> perspective of the throughput, as if you are unable to spill the data > >> faster than to process them, unaligned checkpoints are worse option > >> compared to the aligned checkpoints [1]. If checkpoints are frequent it > >> also doesn’t matter [2]. The true downside is if checkpoints are > infrequent > >> and you have to for example pay $ for the extra storage or extra network > >> traffic to the storage. > >> > >> On the other hand, continuous spilling (persistent communication > >> channels?) might have an added benefit of enabling us localised > failures - > >> failure of one node will not necessarily bring down the whole cluster. > >> > >> > >> > >> As I mentioned, I’m proposing to just start with the continuous > spilling. > >> It might be more costly in some scenarios, but it will offer the most > >> stable and predictable performance with the lowest checkpoint latency. > It’s > >> not perfect, it won’t solve all of the use cases, but frankly all of the > >> other options have their own blind spots, and continuous spilling > should at > >> least fully solve relatively low throughput use cases. We can later > build > >> on top of that solution, expanding it with the following features: > >> > >> 1. Do not spill continuously if there is no backpressure. For example > >> provide a timeout: start spilling pre-emptively/continuously if some > buffer > >> was not processed within X seconds. > >> 2. Start spilling only once the checkpoint starts (this is the exact > >> proposal from the current FLIP-76). > >> 3. Initially we want to spill to a Flink’s FileSystem (for example S3), > >> but in the future we are considering other options, for example Apache > >> Bookeeper. > >> > >> What do you think? > >> > >> Piotrek > >> > >> > >> > >> [1] I’m assuming that the spilling throughput per node can go up to > >> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling > >> in-flight data will take 3.3 times longer than waiting for the > alignment. > >> On the other hand if data processing rate is 10MB/s, overhead of > continuous > >> spilling is relatively low. > >> [2] With checkpoints every one minute, with data processing throughput > >> 30MB/s per node, we would have to persist 1.8GB of data per node between > >> the checkpoints, which is similar order of magnitude as buffered > in-flight > >> data under the back-pressure. With higher throughput, unaligned > checkpoints > >> are not helping ([1]). With lower throughput, both the original proposal > >> and continuous spilling would have to effectively persist all of the > data > >> anyway. > >> > >>> On 10 Oct 2019, at 19:51, Yun Tang <myas...@live.com> wrote: > >>> > >>> Hi Arvid > >>> > >>> +1 for this future which has been hoped for a long time. End-to-end > >> exactly once job could benefit from quicker checkpoint completion. > >>> > >>> > >>> Best > >>> Yun Tang > >>> ________________________________ > >>> From: Yun Gao <yungao...@aliyun.com.INVALID> > >>> Sent: Thursday, October 10, 2019 18:39 > >>> To: dev <dev@flink.apache.org> > >>> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints > >>> > >>> Hi Arvid, > >>> > >>> Very thanks for bring up the discussion! From our side unable > >> to finish the checkpoint is commonly met for online jobs, therefore +1 > from > >> my side to implement this. > >>> A tiny issue of the FLIP is that the Discussion Thread URL > >> attached seems to be not right. > >>> > >>> > >>> Best, > >>> Yun > >>> > >>> > >>> ------------------------------------------------------------------ > >>> From:Arvid Heise <ar...@ververica.com> > >>> Send Time:2019 Sep. 30 (Mon.) 20:31 > >>> To:dev <dev@flink.apache.org> > >>> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints > >>> > >>> Hi Devs, > >>> > >>> I would like to start the formal discussion about FLIP-76 [1], which > >>> improves the checkpoint latency in systems under backpressure, where a > >>> checkpoint can take hours to complete in the worst case. I recommend > the > >>> thread "checkpointing under backpressure" [2] to get a good idea why > >> users > >>> are not satisfied with the current behavior. The key points: > >>> > >>> - Since the checkpoint barrier flows much slower through the > >>> back-pressured channels, the other channels and their upstream > >> operators > >>> are effectively blocked during checkpointing. > >>> - The checkpoint barrier takes a long time to reach the sinks causing > >>> long checkpointing times. A longer checkpointing time in turn means > >> that > >>> the checkpoint will be fairly outdated once done. Since a heavily > >> utilized > >>> pipeline is inherently more fragile, we may run into a vicious cycle > of > >>> late checkpoints, crash, recovery to a rather outdated checkpoint, > more > >>> back pressure, and even later checkpoints, which would result in > >> little to > >>> no progress in the application. > >>> > >>> The FLIP proposes "unaligned checkpoints" which improves the current > >> state, > >>> such that > >>> > >>> - Upstream processes can continue to produce data, even if some > >> operator > >>> still waits on a checkpoint barrier on a specific input channel. > >>> - Checkpointing times are heavily reduced across the execution graph, > >>> even for operators with a single input channel. > >>> - End-users will see more progress even in unstable environments as > >> more > >>> up-to-date checkpoints will avoid too many recomputations. > >>> - Facilitate faster rescaling. > >>> > >>> The key idea is to allow checkpoint barriers to be forwarded to > >> downstream > >>> tasks before the synchronous part of the checkpointing has been > conducted > >>> (see Fig. 1). To that end, we need to store in-flight data as part of > the > >>> checkpoint as described in greater details in this FLIP. > >>> > >>> Although the basic idea was already sketched in [2], we would like get > >>> broader feedback in this dedicated mail thread. > >>> > >>> Best, > >>> > >>> Arvid > >>> > >>> [1] > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints > >>> [2] > >>> > >> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html > >>> > >> > >> > > > > > >