Sry incorrect link, please follow [1]. [1] https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/%3CCAGZNd0FgVL0oDQJHpBwJ1Ha8QevsVG0FHixdet11tLhW2p-2hg%40mail.gmail.com%3E
On Wed, Oct 2, 2019 at 3:44 PM Arvid Heise <ar...@ververica.com> wrote: > FYI, we published FLIP-76 to address the issue and discussion has been > opened in [1]. > > Looking forward to your feedback, > > Arvid > > [1] > https://mail-archives.apache.org/mod_mbox/flink-dev/201909.mbox/browser > > On Thu, Aug 15, 2019 at 9:43 AM Yun Gao <yungao...@aliyun.com.invalid> > wrote: > >> Hi, >> Very thanks for the great points! >> >> For the prioritizing inputs, from another point of view, I think it >> might not cause other bad effects, since we do not need to totally block >> the channels that have seen barriers after the operator has taking >> snapshot. After the snapshotting, if the channels that has not seen >> barriers have buffers, we could first logging and processing these buffers >> and if they do not have buffers, we can still processing the buffers from >> the channels that has seen barriers. Therefore, It seems prioritizing >> inputs should be able to accelerate the checkpoint without other bad >> effects. >> >> and @zhijiangFor making the unaligned checkpoint the only mechanism >> for all cases, I still think we should allow a configurable timeout after >> receiving the first barrier so that the channels may get "drained" during >> the timeout, as pointed out by Stephan. With such a timeout, we are very >> likely not need to snapshot the input buffers, which would be very similar >> to the current aligned checkpoint mechanism. >> >> Best, >> Yun >> >> >> ------------------------------------------------------------------ >> From:zhijiang <wangzhijiang...@aliyun.com.INVALID> >> Send Time:2019 Aug. 15 (Thu.) 02:22 >> To:dev <dev@flink.apache.org> >> Subject:Re: Checkpointing under backpressure >> >> > For the checkpoint to complete, any buffer that >> > arrived prior to the barrier would be to be part of the checkpointed >> state. >> >> Yes, I agree. >> >> > So wouldn't it be important to finish persisting these buffers as fast >> as >> > possible by prioritizing respective inputs? The task won't be able to >> > process records from the inputs that have seen the barrier fast when it >> is >> > already backpressured (or causing the backpressure). >> >> My previous understanding of prioritizing inputs is from task processing >> aspect after snapshot state. If from the persisting buffers aspect, I think >> it might be up to how we implement it. >> If we only tag/reference which buffers in inputs be the part of state, >> and make the real persisting work is done in async way. That means the >> already tagged buffers could be processed by task w/o priority. >> And only after all the persisting work done, the task would report to >> coordinator of finished checkpoint on its side. The key point is how we >> implement to make task could continue processing buffers as soon as >> possible. >> >> Thanks for the further explannation of requirements for speeding up >> checkpoints in backpressure scenario. To make the savepoint finish quickly >> and then tune the setting to avoid backpressure is really a pratical case. >> I think this solution could cover this concern. >> >> Best, >> Zhijiang >> ------------------------------------------------------------------ >> From:Thomas Weise <t...@apache.org> >> Send Time:2019年8月14日(星期三) 19:48 >> To:dev <dev@flink.apache.org>; zhijiang <wangzhijiang...@aliyun.com> >> Subject:Re: Checkpointing under backpressure >> >> --> >> >> On Wed, Aug 14, 2019 at 10:23 AM zhijiang >> <wangzhijiang...@aliyun.com.invalid> wrote: >> >> > Thanks for these great points and disccusions! >> > >> > 1. Considering the way of triggering checkpoint RPC calls to all the >> tasks >> > from Chandy Lamport, it combines two different mechanisms together to >> make >> > sure that the trigger could be fast in different scenarios. >> > But in flink world it might be not very worth trying that way, just as >> > Stephan's analysis for it. Another concern is that it might bring more >> > heavy loads for JobMaster broadcasting this checkpoint RPC to all the >> tasks >> > in large scale job, especially for the very short checkpoint interval. >> > Furthermore it would also cause other important RPC to be executed >> delay to >> > bring potentail timeout risks. >> > >> > 2. I agree with the idea of drawing on the way "take state snapshot on >> > first barrier" from Chandy Lamport instead of barrier alignment >> combining >> > with unaligned checkpoints in flink. >> > >> > > >>>> The benefit would be less latency increase in the channels which >> > already have received barriers. >> > > >>>> However, as mentioned before, not prioritizing the inputs from >> > which barriers are still missing can also have an adverse effect. >> > >> > I think we will not have an adverse effect if not prioritizing the >> inputs >> > w/o barriers in this case. After sync snapshot, the task could actually >> > process any input channels. For the input channel receiving the first >> > barrier, we already have the obvious boundary for persisting buffers. >> For >> > other channels w/o barriers we could persist the following buffers for >> > these channels until barrier arrives in network. Because based on the >> > credit based flow control, the barrier does not need credit to >> transport, >> > then as long as the sender overtakes the barrier accross the output >> queue, >> > the network stack would transport this barrier immediately no matter >> with >> > the inputs condition on receiver side. So there is no requirements to >> > consume accumulated buffers in these channels for higher priority. If >> so it >> > seems that we will not waste any CPU cycles as Piotr concerns before. >> > >> >> I'm not sure I follow this. For the checkpoint to complete, any buffer >> that >> arrived prior to the barrier would be to be part of the checkpointed >> state. >> So wouldn't it be important to finish persisting these buffers as fast as >> possible by prioritizing respective inputs? The task won't be able to >> process records from the inputs that have seen the barrier fast when it is >> already backpressured (or causing the backpressure). >> >> >> > >> > 3. Suppose the unaligned checkpoints performing well in practice, is it >> > possible to make it as the only mechanism for handling all the cases? I >> > mean for the non-backpressure scenario, there are less buffers even >> empty >> > in input/output queue, then the "overtaking barrier--> trigger snapshot >> on >> > first barrier--> persist buffers" might still work well. So we do not >> need >> > to maintain two suits of mechanisms finally. >> > >> > 4. The initial motivation of this dicussion is for checkpoint timeout >> in >> > backpressure scenario. If we adjust the default timeout to a very big >> > value, that means the checkpoint would never timeout and we only need to >> > wait it finish. Then are there still any other problems/concerns if >> > checkpoint takes long time to finish? Althougn we already knew some >> issues >> > before, it is better to gather more user feedbacks to confirm which >> aspects >> > could be solved in this feature design. E.g. the sink commit delay might >> > not be coverd by unaligned solution. >> > >> >> Checkpoints taking too long is the concern that sparks this discussion >> (timeout is just a symptom). The slowness issue also applies to the >> savepoint use case. We would need to be able to take a savepoint fast in >> order to roll forward a fix that can alleviate the backpressure (like >> changing parallelism or making a different configuration change). >> >> >> > >> > Best, >> > Zhijiang >> > ------------------------------------------------------------------ >> > From:Stephan Ewen <se...@apache.org> >> > Send Time:2019年8月14日(星期三) 17:43 >> > To:dev <dev@flink.apache.org> >> > Subject:Re: Checkpointing under backpressure >> > >> > Quick note: The current implementation is >> > >> > Align -> Forward -> Sync Snapshot Part (-> Async Snapshot Part) >> > >> > On Wed, Aug 14, 2019 at 5:21 PM Piotr Nowojski <pi...@ververica.com> >> > wrote: >> > >> > > > Thanks for the great ideas so far. >> > > >> > > +1 >> > > >> > > Regarding other things raised, I mostly agree with Stephan. >> > > >> > > I like the idea of simultaneously starting the checkpoint everywhere >> via >> > > RPC call (especially in cases where Tasks are busy doing some >> synchronous >> > > operations for example for tens of milliseconds. In that case every >> > network >> > > exchange adds tens of milliseconds of delay in propagating the >> > checkpoint). >> > > However I agree that this might be a premature optimisation assuming >> the >> > > current state of our code (we already have checkpoint barriers). >> > > >> > > However I like the idea of switching from: >> > > >> > > 1. A -> S -> F (Align -> snapshot -> forward markers) >> > > >> > > To >> > > >> > > 2. S -> F -> L (Snapshot -> forward markers -> log pending channels) >> > > >> > > Or even to >> > > >> > > 6. F -> S -> L (Forward markers -> snapshot -> log pending channels) >> > > >> > > It feels to me like this would decouple propagation of checkpoints >> from >> > > costs of synchronous snapshots and waiting for all of the checkpoint >> > > barriers to arrive (even if they will overtake in-flight records, this >> > > might take some time). >> > > >> > > > What I like about the Chandy Lamport approach (2.) initiated from >> > > sources is that: >> > > > - Snapshotting imposes no modification to normal processing. >> > > >> > > Yes, I agree that would be nice. Currently, during the alignment and >> > > blocking of the input channels, we might be wasting CPU cycles of up >> > stream >> > > tasks. If we succeed in designing new checkpointing mechanism to not >> > > disrupt/block regular data processing (% the extra IO cost for logging >> > the >> > > in-flight records), that would be a huge improvement. >> > > >> > > Piotrek >> > > >> > > > On 14 Aug 2019, at 14:56, Paris Carbone <seniorcarb...@gmail.com> >> > wrote: >> > > > >> > > > Sure I see. In cases when no periodic aligned snapshots are employed >> > > this is the only option. >> > > > >> > > > Two things that were not highlighted enough so far on the proposed >> > > protocol (included my mails): >> > > > - The Recovery/Reconfiguration strategy should strictly >> > prioritise >> > > processing logged events before entering normal task input operation. >> > > Otherwise causality can be violated. This also means dataflow recovery >> > will >> > > be expected to be slower to the one employed on an aligned snapshot. >> > > > - Same as with state capture, markers should be forwarded upon >> > > first marker received on input. No later than that. Otherwise we have >> > > duplicate side effects. >> > > > >> > > > Thanks for the great ideas so far. >> > > > >> > > > Paris >> > > > >> > > >> On 14 Aug 2019, at 14:33, Stephan Ewen <se...@apache.org> wrote: >> > > >> >> > > >> Scaling with unaligned checkpoints might be a necessity. >> > > >> >> > > >> Let's assume the job failed due to a lost TaskManager, but no new >> > > >> TaskManager becomes available. >> > > >> In that case we need to scale down based on the latest complete >> > > checkpoint, >> > > >> because we cannot produce a new checkpoint. >> > > >> >> > > >> >> > > >> On Wed, Aug 14, 2019 at 2:05 PM Paris Carbone < >> > seniorcarb...@gmail.com> >> > > >> wrote: >> > > >> >> > > >>> +1 I think we are on the same page Stephan. >> > > >>> >> > > >>> Rescaling on unaligned checkpoint sounds challenging and a bit >> > > >>> unnecessary. No? >> > > >>> Why not sticking to aligned snapshots for live >> > > reconfiguration/rescaling? >> > > >>> It’s a pretty rare operation and it would simplify things by a >> lot. >> > > >>> Everything can be “staged” upon alignment including replacing >> > channels >> > > and >> > > >>> tasks. >> > > >>> >> > > >>> -Paris >> > > >>> >> > > >>>> On 14 Aug 2019, at 13:39, Stephan Ewen <se...@apache.org> wrote: >> > > >>>> >> > > >>>> Hi all! >> > > >>>> >> > > >>>> Yes, the first proposal of "unaligend checkpoints" (probably two >> > years >> > > >>> back >> > > >>>> now) drew a major inspiration from Chandy Lamport, as did >> actually >> > the >> > > >>>> original checkpointing algorithm. >> > > >>>> >> > > >>>> "Logging data between first and last barrier" versus "barrier >> > jumping >> > > >>> over >> > > >>>> buffer and storing those buffers" is pretty close same. >> > > >>>> However, there are a few nice benefits of the proposal of >> unaligned >> > > >>>> checkpoints over Chandy-Lamport. >> > > >>>> >> > > >>>> *## Benefits of Unaligned Checkpoints* >> > > >>>> >> > > >>>> (1) It is very similar to the original algorithm (can be seen an >> an >> > > >>>> optional feature purely in the network stack) and thus can share >> > > lot's of >> > > >>>> code paths. >> > > >>>> >> > > >>>> (2) Less data stored. If we make the "jump over buffers" part >> > timeout >> > > >>> based >> > > >>>> (for example barrier overtakes buffers if not flushed within >> 10ms) >> > > then >> > > >>>> checkpoints are in the common case of flowing pipelines aligned >> > > without >> > > >>>> in-flight data. Only back pressured cases store some in-flight >> data, >> > > >>> which >> > > >>>> means we don't regress in the common case and only fix the back >> > > pressure >> > > >>>> case. >> > > >>>> >> > > >>>> (3) Faster checkpoints. Chandy Lamport still waits for all >> barriers >> > to >> > > >>>> arrive naturally, logging on the way. If data processing is slow, >> > this >> > > >>> can >> > > >>>> still take quite a while. >> > > >>>> >> > > >>>> ==> I think both these points are strong reasons to not change >> the >> > > >>>> mechanism away from "trigger sources" and start with CL-style >> > "trigger >> > > >>> all". >> > > >>>> >> > > >>>> >> > > >>>> *## Possible ways to combine Chandy Lamport and Unaligned >> > Checkpoints* >> > > >>>> >> > > >>>> We can think about something like "take state snapshot on first >> > > barrier" >> > > >>>> and then store buffers until the other barriers arrive. Inside >> the >> > > >>> network >> > > >>>> stack, barriers could still overtake and persist buffers. >> > > >>>> The benefit would be less latency increase in the channels which >> > > already >> > > >>>> have received barriers. >> > > >>>> However, as mentioned before, not prioritizing the inputs from >> which >> > > >>>> barriers are still missing can also have an adverse effect. >> > > >>>> >> > > >>>> >> > > >>>> *## Concerning upgrades* >> > > >>>> >> > > >>>> I think it is a fair restriction to say that upgrades need to >> happen >> > > on >> > > >>>> aligned checkpoints. It is a rare enough operation. >> > > >>>> >> > > >>>> >> > > >>>> *## Concerning re-scaling (changing parallelism)* >> > > >>>> >> > > >>>> We need to support that on unaligned checkpoints as well. There >> are >> > > >>> several >> > > >>>> feature proposals about automatic scaling, especially down >> scaling >> > in >> > > >>> case >> > > >>>> of missing resources. The last snapshot might be a regular >> > > checkpoint, so >> > > >>>> all checkpoints need to support rescaling. >> > > >>>> >> > > >>>> >> > > >>>> *## Concerning end-to-end checkpoint duration and "trigger >> sources" >> > > >>> versus >> > > >>>> "trigger all"* >> > > >>>> >> > > >>>> I think for the end-to-end checkpoint duration, an "overtake >> > buffers" >> > > >>>> approach yields faster checkpoints, as mentioned above (Chandy >> > Lamport >> > > >>>> logging still needs to wait for barrier to flow). >> > > >>>> >> > > >>>> I don't see the benefit of a "trigger all tasks via RPC >> > concurrently" >> > > >>>> approach. Bear in mind that it is still a globally coordinated >> > > approach >> > > >>> and >> > > >>>> you need to wait for the global checkpoint to complete before >> > > committing >> > > >>>> any side effects. >> > > >>>> I believe that the checkpoint time is more determined by the >> state >> > > >>>> checkpoint writing, and the global coordination and metadata >> commit, >> > > than >> > > >>>> by the difference in alignment time between "trigger from source >> and >> > > jump >> > > >>>> over buffers" versus "trigger all tasks concurrently". >> > > >>>> >> > > >>>> Trying to optimize a few tens of milliseconds out of the network >> > stack >> > > >>>> sends (and changing the overall checkpointing approach completely >> > for >> > > >>> that) >> > > >>>> while staying with a globally coordinated checkpoint will send us >> > > down a >> > > >>>> path to a dead end. >> > > >>>> >> > > >>>> To really bring task persistence latency down to 10s of >> milliseconds >> > > (so >> > > >>> we >> > > >>>> can frequently commit in sinks), we need to take an approach >> without >> > > any >> > > >>>> global coordination. Tasks need to establish a persistent >> recovery >> > > point >> > > >>>> individually and at their own discretion, only then can it be >> > frequent >> > > >>>> enough. To get there, they would need to decouple themselves from >> > the >> > > >>>> predecessor and successor tasks (via something like persistent >> > > channels). >> > > >>>> This is a different discussion, though, somewhat orthogonal to >> this >> > > one >> > > >>>> here. >> > > >>>> >> > > >>>> Best, >> > > >>>> Stephan >> > > >>>> >> > > >>>> >> > > >>>> On Wed, Aug 14, 2019 at 12:37 PM Piotr Nowojski < >> > pi...@ververica.com> >> > > >>> wrote: >> > > >>>> >> > > >>>>> Hi again, >> > > >>>>> >> > > >>>>> Zhu Zhu let me think about this more. Maybe as Paris is >> writing, we >> > > do >> > > >>> not >> > > >>>>> need to block any channels at all, at least assuming credit base >> > flow >> > > >>>>> control. Regarding what should happen with the following >> checkpoint >> > > is >> > > >>>>> another question. Also, should we support concurrent checkpoints >> > and >> > > >>>>> subsuming checkpoints as we do now? Maybe not… >> > > >>>>> >> > > >>>>> Paris >> > > >>>>> >> > > >>>>> Re >> > > >>>>> I. 2. a) and b) - yes, this would have to be taken into an >> account >> > > >>>>> I. 2. c) and IV. 2. - without those, end to end checkpoint time >> > will >> > > >>>>> probably be longer than it could be. It might affect external >> > > systems. >> > > >>> For >> > > >>>>> example Kafka, which automatically time outs lingering >> > transactions, >> > > and >> > > >>>>> for us, the transaction time is equal to the time between two >> > > >>> checkpoints. >> > > >>>>> >> > > >>>>> II 1. - I’m confused. To make things straight. Flink is >> currently >> > > >>>>> snapshotting once it receives all of the checkpoint barriers >> from >> > > all of >> > > >>>>> the input channels and only then it broadcasts the checkpoint >> > barrier >> > > >>> down >> > > >>>>> the stream. And this is correct from exactly-once perspective. >> > > >>>>> >> > > >>>>> As far as I understand, your proposal based on Chandy Lamport >> > > algorithm, >> > > >>>>> is snapshotting the state of the operator on the first >> checkpoint >> > > >>> barrier, >> > > >>>>> which also looks correct to me. >> > > >>>>> >> > > >>>>> III. 1. As I responded to Zhu Zhu, let me think a bit more about >> > > this. >> > > >>>>> >> > > >>>>> V. Yes, we still need aligned checkpoints, as they are easier >> for >> > > state >> > > >>>>> migration and upgrades. >> > > >>>>> >> > > >>>>> Piotrek >> > > >>>>> >> > > >>>>>> On 14 Aug 2019, at 11:22, Paris Carbone < >> seniorcarb...@gmail.com> >> > > >>> wrote: >> > > >>>>>> >> > > >>>>>> Now I see a little more clearly what you have in mind. Thanks >> for >> > > the >> > > >>>>> explanation! >> > > >>>>>> There are a few intermixed concepts here, some how to do with >> > > >>>>> correctness some with performance. >> > > >>>>>> Before delving deeper I will just enumerate a few things to >> make >> > > myself >> > > >>>>> a little more helpful if I can. >> > > >>>>>> >> > > >>>>>> I. Initiation >> > > >>>>>> ------------- >> > > >>>>>> >> > > >>>>>> 1. RPC to sources only is a less intrusive way to initiate >> > snapshots >> > > >>>>> since you utilize better pipeline parallelism (only a small >> subset >> > of >> > > >>> tasks >> > > >>>>> is running progressively the protocol at a time, if >> snapshotting is >> > > >>> async >> > > >>>>> the overall overhead might not even be observable). >> > > >>>>>> >> > > >>>>>> 2. If we really want an RPC to all initiation take notice of >> the >> > > >>>>> following implications: >> > > >>>>>> >> > > >>>>>> a. (correctness) RPC calls are not guaranteed to arrive in >> > every >> > > >>>>> task before a marker from a preceding task. >> > > >>>>>> >> > > >>>>>> b. (correctness) Either the RPC call OR the first arriving >> > marker >> > > >>>>> should initiate the algorithm. Whichever comes first. If you >> only >> > do >> > > it >> > > >>> per >> > > >>>>> RPC call then you capture a "late" state that includes side >> effects >> > > of >> > > >>>>> already logged events. >> > > >>>>>> >> > > >>>>>> c. (performance) Lots of IO will be invoked at the same >> time on >> > > >>>>> the backend store from all tasks. This might lead to high >> > congestion >> > > in >> > > >>>>> async snapshots. >> > > >>>>>> >> > > >>>>>> II. Capturing State First >> > > >>>>>> ------------------------- >> > > >>>>>> >> > > >>>>>> 1. (correctness) Capturing state at the last marker sounds >> > > incorrect to >> > > >>>>> me (state contains side effects of already logged events based >> on >> > the >> > > >>>>> proposed scheme). This results into duplicate processing. No? >> > > >>>>>> >> > > >>>>>> III. Channel Blocking / "Alignment" >> > > >>>>>> ----------------------------------- >> > > >>>>>> >> > > >>>>>> 1. (performance?) What is the added benefit? We dont want a >> > > "complete" >> > > >>>>> transactional snapshot, async snapshots are purely for >> > > failure-recovery. >> > > >>>>> Thus, I dont see why this needs to be imposed at the expense of >> > > >>>>> performance/throughput. With the proposed scheme the whole >> dataflow >> > > >>> anyway >> > > >>>>> enters snapshotting/logging mode so tasks more or less snapshot >> > > >>>>> concurrently. >> > > >>>>>> >> > > >>>>>> IV Marker Bypassing >> > > >>>>>> ------------------- >> > > >>>>>> >> > > >>>>>> 1. (correctness) This leads to equivalent in-flight snapshots >> so >> > > with >> > > >>>>> some quick thinking correct. I will try to model this later and >> > get >> > > >>> back >> > > >>>>> to you in case I find something wrong. >> > > >>>>>> >> > > >>>>>> 2. (performance) It also sounds like a meaningful >> optimisation! I >> > > like >> > > >>>>> thinking of this as a push-based snapshot. i.e., the producing >> task >> > > >>> somehow >> > > >>>>> triggers forward a consumer/channel to capture its state. By >> > example >> > > >>>>> consider T1 -> |marker t1| -> T2. >> > > >>>>>> >> > > >>>>>> V. Usage of "Async" Snapshots >> > > >>>>>> --------------------- >> > > >>>>>> >> > > >>>>>> 1. Do you see this as a full replacement of "full" aligned >> > > >>>>> snapshots/savepoints? In my view async shanpshots will be needed >> > from >> > > >>> time >> > > >>>>> to time but not as frequently. Yet, it seems like a valid >> approach >> > > >>> solely >> > > >>>>> for failure-recovery on the same configuration. Here's why: >> > > >>>>>> >> > > >>>>>> a. With original snapshotting there is a strong duality >> between >> > > >>>>>> a stream input (offsets) and committed side effects >> (internal >> > > >>>>> states and external commits to transactional sinks). While in >> the >> > > async >> > > >>>>> version, there are uncommitted operations (inflight records). >> Thus, >> > > you >> > > >>>>> cannot use these snapshots for e.g., submitting sql queries with >> > > >>> snapshot >> > > >>>>> isolation. Also, the original snapshotting gives a lot of >> potential >> > > for >> > > >>>>> flink to make proper transactional commits externally. >> > > >>>>>> >> > > >>>>>> b. Reconfiguration is very tricky, you probably know that >> > better. >> > > >>>>> Inflight channel state is no longer valid in a new configuration >> > > (i.e., >> > > >>> new >> > > >>>>> dataflow graph, new operators, updated operator logic, different >> > > >>> channels, >> > > >>>>> different parallelism) >> > > >>>>>> >> > > >>>>>> 2. Async snapshots can also be potentially useful for >> monitoring >> > the >> > > >>>>> general health of a dataflow since they can be analyzed by the >> task >> > > >>> manager >> > > >>>>> about the general performance of a job graph and spot >> bottlenecks >> > for >> > > >>>>> example. >> > > >>>>>> >> > > >>>>>>> On 14 Aug 2019, at 09:08, Piotr Nowojski <pi...@ververica.com >> > >> > > wrote: >> > > >>>>>>> >> > > >>>>>>> Hi, >> > > >>>>>>> >> > > >>>>>>> Thomas: >> > > >>>>>>> There are no Jira tickets yet (or maybe there is something >> very >> > old >> > > >>>>> somewhere). First we want to discuss it, next present FLIP and >> at >> > > last >> > > >>>>> create tickets :) >> > > >>>>>>> >> > > >>>>>>>> if I understand correctly, then the proposal is to not block >> any >> > > >>>>>>>> input channel at all, but only log data from the >> backpressured >> > > >>> channel >> > > >>>>> (and >> > > >>>>>>>> make it part of the snapshot) until the barrier arrives >> > > >>>>>>> >> > > >>>>>>> I would guess that it would be better to block the reads, >> unless >> > we >> > > >>> can >> > > >>>>> already process the records from the blocked channel… >> > > >>>>>>> >> > > >>>>>>> Paris: >> > > >>>>>>> >> > > >>>>>>> Thanks for the explanation Paris. I’m starting to understand >> this >> > > more >> > > >>>>> and I like the idea of snapshotting the state of an operator >> before >> > > >>>>> receiving all of the checkpoint barriers - this would allow more >> > > things >> > > >>> to >> > > >>>>> happen at the same time instead of sequentially. As Zhijiang has >> > > pointed >> > > >>>>> out there are some things not considered in your proposal: >> > overtaking >> > > >>>>> output buffers, but maybe those things could be incorporated >> > > together. >> > > >>>>>>> >> > > >>>>>>> Another thing is that from the wiki description I understood >> that >> > > the >> > > >>>>> initial checkpointing is not initialised by any checkpoint >> barrier, >> > > but >> > > >>> by >> > > >>>>> an independent call/message from the Observer. I haven’t played >> > with >> > > >>> this >> > > >>>>> idea a lot, but I had some discussion with Nico and it seems >> that >> > it >> > > >>> might >> > > >>>>> work: >> > > >>>>>>> >> > > >>>>>>> 1. JobManager sends and RPC “start checkpoint” to all tasks >> > > >>>>>>> 2. Task (with two input channels l1 and l2) upon receiving RPC >> > from >> > > >>> 1., >> > > >>>>> takes a snapshot of it's state and: >> > > >>>>>>> a) broadcast checkpoint barrier down the stream to all >> channels >> > > (let’s >> > > >>>>> ignore for a moment potential for this barrier to overtake the >> > buffer >> > > >>>>> output data) >> > > >>>>>>> b) for any input channel for which it hasn’t yet received >> > > checkpoint >> > > >>>>> barrier, the data are being added to the checkpoint >> > > >>>>>>> c) once a channel (for example l1) receives a checkpoint >> barrier, >> > > the >> > > >>>>> Task blocks reads from that channel (?) >> > > >>>>>>> d) after all remaining channels (l2) receive checkpoint >> barriers, >> > > the >> > > >>>>> Task first has to process the buffered data after that it can >> > > unblock >> > > >>> the >> > > >>>>> reads from the channels >> > > >>>>>>> >> > > >>>>>>> Checkpoint barriers do not cascade/flow through different >> tasks >> > > here. >> > > >>>>> Checkpoint barrier emitted from Task1, reaches only the >> immediate >> > > >>>>> downstream Tasks. Thanks to this setup, total checkpointing >> time is >> > > not >> > > >>> sum >> > > >>>>> of checkpointing times of all Tasks one by one, but more or less >> > max >> > > of >> > > >>> the >> > > >>>>> slowest Tasks. Right? >> > > >>>>>>> >> > > >>>>>>> Couple of intriguing thoughts are: >> > > >>>>>>> 3. checkpoint barriers overtaking the output buffers >> > > >>>>>>> 4. can we keep processing some data (in order to not waste CPU >> > > cycles) >> > > >>>>> after we have taking the snapshot of the Task. I think we could. >> > > >>>>>>> >> > > >>>>>>> Piotrek >> > > >>>>>>> >> > > >>>>>>>> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> >> wrote: >> > > >>>>>>>> >> > > >>>>>>>> Great discussion! I'm excited that this is already under >> > > >>>>> consideration! Are >> > > >>>>>>>> there any JIRAs or other traces of discussion to follow? >> > > >>>>>>>> >> > > >>>>>>>> Paris, if I understand correctly, then the proposal is to not >> > > block >> > > >>> any >> > > >>>>>>>> input channel at all, but only log data from the >> backpressured >> > > >>> channel >> > > >>>>> (and >> > > >>>>>>>> make it part of the snapshot) until the barrier arrives? >> This is >> > > >>>>>>>> intriguing. But probably there is also a benefit of to not >> > > continue >> > > >>>>> reading >> > > >>>>>>>> I1 since that could speed up retrieval from I2. Also, if the >> > user >> > > >>> code >> > > >>>>> is >> > > >>>>>>>> the cause of backpressure, this would avoid pumping more data >> > into >> > > >>> the >> > > >>>>>>>> process function. >> > > >>>>>>>> >> > > >>>>>>>> Thanks, >> > > >>>>>>>> Thomas >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> On Tue, Aug 13, 2019 at 8:02 AM zhijiang < >> > > wangzhijiang...@aliyun.com >> > > >>>>> .invalid> >> > > >>>>>>>> wrote: >> > > >>>>>>>> >> > > >>>>>>>>> Hi Paris, >> > > >>>>>>>>> >> > > >>>>>>>>> Thanks for the detailed sharing. And I think it is very >> similar >> > > with >> > > >>>>> the >> > > >>>>>>>>> way of overtaking we proposed before. >> > > >>>>>>>>> >> > > >>>>>>>>> There are some tiny difference: >> > > >>>>>>>>> The way of overtaking might need to snapshot all the >> > input/output >> > > >>>>> queues. >> > > >>>>>>>>> Chandy Lamport seems only need to snaphost (n-1) input >> channels >> > > >>> after >> > > >>>>> the >> > > >>>>>>>>> first barrier arrives, which might reduce the state sizea >> bit. >> > > But >> > > >>>>> normally >> > > >>>>>>>>> there should be less buffers for the first input channel >> with >> > > >>> barrier. >> > > >>>>>>>>> The output barrier still follows with regular data stream in >> > > Chandy >> > > >>>>>>>>> Lamport, the same way as current flink. For overtaking way, >> we >> > > need >> > > >>>>> to pay >> > > >>>>>>>>> extra efforts to make barrier transport firstly before >> outque >> > > queue >> > > >>> on >> > > >>>>>>>>> upstream side, and change the way of barrier alignment >> based on >> > > >>>>> receiving >> > > >>>>>>>>> instead of current reading on downstream side. >> > > >>>>>>>>> In the backpressure caused by data skew, the first barrier >> in >> > > almost >> > > >>>>> empty >> > > >>>>>>>>> input channel should arrive much eariler than the last heavy >> > load >> > > >>>>> input >> > > >>>>>>>>> channel, so the Chandy Lamport could benefit well. But for >> the >> > > case >> > > >>>>> of all >> > > >>>>>>>>> balanced heavy load input channels, I mean the first arrived >> > > barrier >> > > >>>>> might >> > > >>>>>>>>> still take much time, then the overtaking way could still >> fit >> > > well >> > > >>> to >> > > >>>>> speed >> > > >>>>>>>>> up checkpoint. >> > > >>>>>>>>> Anyway, your proposed suggestion is helpful on my side, >> > > especially >> > > >>>>>>>>> considering some implementation details . >> > > >>>>>>>>> >> > > >>>>>>>>> Best, >> > > >>>>>>>>> Zhijiang >> > > >>>>>>>>> >> > > ------------------------------------------------------------------ >> > > >>>>>>>>> From:Paris Carbone <seniorcarb...@gmail.com> >> > > >>>>>>>>> Send Time:2019年8月13日(星期二) 14:03 >> > > >>>>>>>>> To:dev <dev@flink.apache.org> >> > > >>>>>>>>> Cc:zhijiang <wangzhijiang...@aliyun.com> >> > > >>>>>>>>> Subject:Re: Checkpointing under backpressure >> > > >>>>>>>>> >> > > >>>>>>>>> yes! It’s quite similar I think. Though mind that the >> devil is >> > > in >> > > >>> the >> > > >>>>>>>>> details, i.e., the temporal order actions are taken. >> > > >>>>>>>>> >> > > >>>>>>>>> To clarify, let us say you have a task T with two input >> > channels >> > > I1 >> > > >>>>> and I2. >> > > >>>>>>>>> The Chandy Lamport execution flow is the following: >> > > >>>>>>>>> >> > > >>>>>>>>> 1) T receives barrier from I1 and... >> > > >>>>>>>>> 2) ...the following three actions happen atomically >> > > >>>>>>>>> I ) T snapshots its state T* >> > > >>>>>>>>> II) T forwards marker to its outputs >> > > >>>>>>>>> III) T starts logging all events of I2 (only) into a buffer >> M* >> > > >>>>>>>>> - Also notice here that T does NOT block I1 as it does in >> > aligned >> > > >>>>>>>>> snapshots - >> > > >>>>>>>>> 3) Eventually T receives barrier from I2 and stops recording >> > > events. >> > > >>>>> Its >> > > >>>>>>>>> asynchronously captured snapshot is now complete: {T*,M*}. >> > > >>>>>>>>> Upon recovery all messages of M* should be replayed in FIFO >> > > order. >> > > >>>>>>>>> >> > > >>>>>>>>> With this approach alignment does not create a deadlock >> > situation >> > > >>>>> since >> > > >>>>>>>>> anyway 2.II happens asynchronously and messages can be >> logged >> > as >> > > >>> well >> > > >>>>>>>>> asynchronously during the process of the snapshot. If there >> is >> > > >>>>>>>>> back-pressure in a pipeline the cause is most probably not >> this >> > > >>>>> algorithm. >> > > >>>>>>>>> >> > > >>>>>>>>> Back to your observation, the answer : yes and no. In your >> > > network >> > > >>>>> model, >> > > >>>>>>>>> I can see the logic of “logging” and “committing” a final >> > > snapshot >> > > >>>>> being >> > > >>>>>>>>> provided by the channel implementation. However, do mind >> that >> > the >> > > >>>>> first >> > > >>>>>>>>> barrier always needs to go “all the way” to initiate the >> Chandy >> > > >>>>> Lamport >> > > >>>>>>>>> algorithm logic. >> > > >>>>>>>>> >> > > >>>>>>>>> The above flow has been proven using temporal logic in my >> phd >> > > thesis >> > > >>>>> in >> > > >>>>>>>>> case you are interested about the proof. >> > > >>>>>>>>> I hope this helps a little clarifying things. Let me know if >> > > there >> > > >>> is >> > > >>>>> any >> > > >>>>>>>>> confusing point to disambiguate. I would be more than happy >> to >> > > help >> > > >>>>> if I >> > > >>>>>>>>> can. >> > > >>>>>>>>> >> > > >>>>>>>>> Paris >> > > >>>>>>>>> >> > > >>>>>>>>>> On 13 Aug 2019, at 13:28, Piotr Nowojski < >> pi...@ververica.com >> > > >> > > >>>>> wrote: >> > > >>>>>>>>>> >> > > >>>>>>>>>> Thanks for the input. Regarding the Chandy-Lamport >> snapshots >> > > don’t >> > > >>>>> you >> > > >>>>>>>>> still have to wait for the “checkpoint barrier” to arrive in >> > > order >> > > >>> to >> > > >>>>> know >> > > >>>>>>>>> when have you already received all possible messages from >> the >> > > >>> upstream >> > > >>>>>>>>> tasks/operators? So instead of processing the “in flight” >> > > messages >> > > >>>>> (as the >> > > >>>>>>>>> Flink is doing currently), you are sending them to an >> > “observer”? >> > > >>>>>>>>>> >> > > >>>>>>>>>> In that case, that’s sounds similar to “checkpoint barriers >> > > >>>>> overtaking >> > > >>>>>>>>> in flight records” (aka unaligned checkpoints). Just for us, >> > the >> > > >>>>> observer >> > > >>>>>>>>> is a snapshot state. >> > > >>>>>>>>>> >> > > >>>>>>>>>> Piotrek >> > > >>>>>>>>>> >> > > >>>>>>>>>>> On 13 Aug 2019, at 13:14, Paris Carbone < >> > > seniorcarb...@gmail.com> >> > > >>>>>>>>> wrote: >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Interesting problem! Thanks for bringing it up Thomas. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Ignore/Correct me if I am wrong but I believe >> Chandy-Lamport >> > > >>>>> snapshots >> > > >>>>>>>>> [1] would help out solve this problem more elegantly without >> > > >>>>> sacrificing >> > > >>>>>>>>> correctness. >> > > >>>>>>>>>>> - They do not need alignment, only (async) logging for >> > > in-flight >> > > >>>>>>>>> records between the time the first barrier is processed >> until >> > the >> > > >>> last >> > > >>>>>>>>> barrier arrives in a task. >> > > >>>>>>>>>>> - They work fine for failure recovery as long as logged >> > records >> > > >>> are >> > > >>>>>>>>> replayed on startup. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> Flink’s “alligned” savepoints would probably be still >> > necessary >> > > >>> for >> > > >>>>>>>>> transactional sink commits + any sort of reconfiguration >> (e.g., >> > > >>>>> rescaling, >> > > >>>>>>>>> updating the logic of operators to evolve an application >> etc.). >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> I don’t completely understand the “overtaking” approach >> but >> > if >> > > you >> > > >>>>> have >> > > >>>>>>>>> a concrete definition I would be happy to check it out and >> help >> > > if I >> > > >>>>> can! >> > > >>>>>>>>>>> Mind that Chandy-Lamport essentially does this by logging >> > > things >> > > >>> in >> > > >>>>>>>>> pending channels in a task snapshot before the barrier >> arrives. >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> -Paris >> > > >>>>>>>>>>> >> > > >>>>>>>>>>> [1] >> > > >>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm >> > > >>>>> < >> > > >>>>>>>>> >> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm >> > > >> > > >>>>>>>>>>> >> > > >>>>>>>>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski < >> > pi...@ververica.com >> > > > >> > > >>>>> wrote: >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> Hi Thomas, >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> As Zhijiang has responded, we are now in the process of >> > > >>> discussing >> > > >>>>> how >> > > >>>>>>>>> to address this issue and one of the solution that we are >> > > discussing >> > > >>>>> is >> > > >>>>>>>>> exactly what you are proposing: checkpoint barriers >> overtaking >> > > the >> > > >>> in >> > > >>>>>>>>> flight data and make the in flight data part of the >> checkpoint. >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> If everything works well, we will be able to present >> result >> > of >> > > >>> our >> > > >>>>>>>>> discussions on the dev mailing list soon. >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>> Piotrek >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>>>> On 12 Aug 2019, at 23:23, zhijiang < >> > > wangzhijiang...@aliyun.com >> > > >>>>> .INVALID> >> > > >>>>>>>>> wrote: >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Hi Thomas, >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Thanks for proposing this concern. The barrier alignment >> > > takes >> > > >>>>> long >> > > >>>>>>>>> time in backpressure case which could cause several >> problems: >> > > >>>>>>>>>>>>> 1. Checkpoint timeout as you mentioned. >> > > >>>>>>>>>>>>> 2. The recovery cost is high once failover, because much >> > data >> > > >>>>> needs >> > > >>>>>>>>> to be replayed. >> > > >>>>>>>>>>>>> 3. The delay for commit-based sink is high in >> exactly-once. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> For credit-based flow control from release-1.5, the >> amount >> > of >> > > >>>>>>>>> in-flighting buffers before barrier alignment is reduced, >> so we >> > > >>> could >> > > >>>>> get a >> > > >>>>>>>>> bit >> > > >>>>>>>>>>>>> benefits from speeding checkpoint aspect. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> In release-1.8, I guess we did not suspend the channels >> > which >> > > >>>>> already >> > > >>>>>>>>> received the barrier in practice. But actually we ever did >> the >> > > >>>>> similar thing >> > > >>>>>>>>>>>>> to speed barrier alighment before. I am not quite sure >> that >> > > >>>>>>>>> release-1.8 covers this feature. There were some relevant >> > > >>> discussions >> > > >>>>> under >> > > >>>>>>>>> jira [1]. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> For release-1.10, the community is now discussing the >> > > feature of >> > > >>>>>>>>> unaligned checkpoint which is mainly for resolving above >> > > concerns. >> > > >>> The >> > > >>>>>>>>> basic idea >> > > >>>>>>>>>>>>> is to make barrier overtakes the output/input buffer >> queue >> > to >> > > >>>>> speed >> > > >>>>>>>>> alignment, and snapshot the input/output buffers as part of >> > > >>> checkpoint >> > > >>>>>>>>> state. The >> > > >>>>>>>>>>>>> details have not confirmed yet and is still under >> > discussion. >> > > >>>>> Wish we >> > > >>>>>>>>> could make some improvments for the release-1.10. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523 >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Best, >> > > >>>>>>>>>>>>> Zhijiang >> > > >>>>>>>>>>>>> >> > > >>> ------------------------------------------------------------------ >> > > >>>>>>>>>>>>> From:Thomas Weise <t...@apache.org> >> > > >>>>>>>>>>>>> Send Time:2019年8月12日(星期一) 21:38 >> > > >>>>>>>>>>>>> To:dev <dev@flink.apache.org> >> > > >>>>>>>>>>>>> Subject:Checkpointing under backpressure >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Hi, >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> One of the major operational difficulties we observe >> with >> > > Flink >> > > >>>>> are >> > > >>>>>>>>>>>>> checkpoint timeouts under backpressure. I'm looking for >> > both >> > > >>>>>>>>> confirmation >> > > >>>>>>>>>>>>> of my understanding of the current behavior as well as >> > > pointers >> > > >>>>> for >> > > >>>>>>>>> future >> > > >>>>>>>>>>>>> improvement work: >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Prior to introduction of credit based flow control in >> the >> > > >>> network >> > > >>>>>>>>> stack [1] >> > > >>>>>>>>>>>>> [2], checkpoint barriers would back up with the data for >> > all >> > > >>>>> logical >> > > >>>>>>>>>>>>> channels due to TCP backpressure. Since Flink 1.5, the >> > > buffers >> > > >>> are >> > > >>>>>>>>>>>>> controlled per channel, and checkpoint barriers are only >> > held >> > > >>>>> back for >> > > >>>>>>>>>>>>> channels that have backpressure, while others can >> continue >> > > >>>>> processing >> > > >>>>>>>>>>>>> normally. However, checkpoint barriers still cannot >> > "overtake >> > > >>>>> data", >> > > >>>>>>>>>>>>> therefore checkpoint alignment remains affected for the >> > > channel >> > > >>>>> with >> > > >>>>>>>>>>>>> backpressure, with the potential for slow checkpointing >> and >> > > >>>>> timeouts. >> > > >>>>>>>>>>>>> Albeit the delay of barriers would be capped by the >> maximum >> > > >>>>> in-transit >> > > >>>>>>>>>>>>> buffers per channel, resulting in an improvement >> compared >> > to >> > > >>>>> previous >> > > >>>>>>>>>>>>> versions of Flink. Also, the backpressure based >> checkpoint >> > > >>>>> alignment >> > > >>>>>>>>> can >> > > >>>>>>>>>>>>> help the barrier advance faster on the receiver side (by >> > > >>>>> suspending >> > > >>>>>>>>>>>>> channels that have already delivered the barrier). Is >> that >> > > >>>>> accurate >> > > >>>>>>>>> as of >> > > >>>>>>>>>>>>> Flink 1.8? >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> What appears to be missing to completely unblock >> > > checkpointing >> > > >>> is >> > > >>>>> a >> > > >>>>>>>>>>>>> mechanism for checkpoints to overtake the data. That >> would >> > > help >> > > >>> in >> > > >>>>>>>>>>>>> situations where the processing itself is the bottleneck >> > and >> > > >>>>>>>>> prioritization >> > > >>>>>>>>>>>>> in the network stack alone cannot address the barrier >> > delay. >> > > Was >> > > >>>>>>>>> there any >> > > >>>>>>>>>>>>> related discussion? One possible solution would be to >> drain >> > > >>>>> incoming >> > > >>>>>>>>> data >> > > >>>>>>>>>>>>> till the barrier and make it part of the checkpoint >> instead >> > > of >> > > >>>>>>>>> processing >> > > >>>>>>>>>>>>> it. This is somewhat related to asynchronous processing, >> > but >> > > I'm >> > > >>>>>>>>> thinking >> > > >>>>>>>>>>>>> more of a solution that is automated in the Flink >> runtime >> > for >> > > >>> the >> > > >>>>>>>>>>>>> backpressure scenario only. >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> Thanks, >> > > >>>>>>>>>>>>> Thomas >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>>> [1] >> > > >>> https://flink.apache.org/2019/06/05/flink-network-stack.html >> > > >>>>>>>>>>>>> [2] >> > > >>>>>>>>>>>>> >> > > >>>>>>>>> >> > > >>>>> >> > > >>> >> > > >> > >> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn >> > > >>>>>>>>>>>>> >> > > >>>>>>>>>>>> >> > > >>>>>>>>>>> >> > > >>>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>> >> > > >>>>>> >> > > >>>>> >> > > >>>>> >> > > >>> >> > > >>> >> > > > >> > > >> > > >> > >> > >> >> >>