Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-03-05 Thread Arvid Heise
ration during
> > > implementation, which could speed up the recovery speed especially when
> > the
> > > amount of in-flight data is huge.
> > >
> > > 3. About checkpointing policy, are the below understanding correct?
> Maybe
> > > it helps if we map them more explicitly in FLIP doc, IMHO:
> > > * For single input channel, there's no difference between
> > > UNALIGNED_WITH_MAX_INFLIGHT_DATA
> > > and UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA, both means we start the
> > > checkpoint once observe the barrier in the input channel.
> > > * For multiple input channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA
> means
> > > starting checkpoint only when barrier appears in all input channels,
> > > while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint
> > when
> > > barrier appears in any one of the input channels.
> > >
> > > 4. It seems now we only support pre-defined options, but is it possible
> > to
> > > switch in between dynamically? For example, if we predefine the policy
> to
> > > ALIGNED, could we supply a command to switch
> > > to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed?
> > Or
> > > switch to ALIGNED if we see too much data persisted for
> > > UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> > > what's preventing us from being more adaptive?
> > >
> > > Thanks!
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski 
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I would like to propose a modification to this FLIP.
> > >>
> > >> Based on the feedback that we were receiving after publishing this
> > >> document and during Flink Forward, I was growing more and more anxious
> > >> about one issue here: having to persist all buffered in-flight data at
> > >> once. As the volume of this data might be large (GBs per TaskManager
> > even
> > >> with small clusters and relatively simple jobs), the time to persist
> > all of
> > >> this data at once might be quite substantial.
> > >>
> > >>
> > >>
> > >> To address this issue, I would like to propose that at first we
> > implement
> > >> a variant of unaligned checkpoints, just as written down in FLIP-76,
> but
> > >> with continuous spilling - all data will be persisted/spilled
> > continuously,
> > >> all the time as they come - not at once when the checkpoint starts.
> > Think
> > >> about this proposal as incremental way of persisting the data.
> > >>
> > >> Pros of continuous spilling:
> > >> + faster checkpointing, as there will be no need to store GBs of data,
> > >> just flush/close.
> > >> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> > loads,
> > >> steady records throughput and spilling.
> > >>
> > >> Cons of continuous spilling:
> > >> - need to persist all of the network traffic instead of persisting
> just
> > >> the in-flight data
> > >>
> > >> Larger volume of persisted data doesn’t matter that much from the
> > >> perspective of the throughput, as if you are unable to spill the data
> > >> faster than to process them, unaligned checkpoints are worse option
> > >> compared to the aligned checkpoints [1]. If checkpoints are frequent
> it
> > >> also doesn’t matter [2]. The true downside is if checkpoints are
> > infrequent
> > >> and you have to for example pay $ for the extra storage or extra
> network
> > >> traffic to the storage.
> > >>
> > >> On the other hand, continuous spilling (persistent communication
> > >> channels?) might have an added benefit of enabling us localised
> > failures -
> > >> failure of one node will not necessarily bring down the whole cluster.
> > >>
> > >>
> > >>
> > >> As I mentioned, I’m proposing to just start with the continuous
> > spilling.
> > >> It might be more costly in some scenarios, but it will offer the most
> > >> stable and predictable performance with the lowest checkpoint latency.
> > It’s
> > >> not perfect, it won’t solve all of the use cases, but frankly all of
> the
> > >> other options have their own blind spots, and continuou

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-02-28 Thread Yu Li
se a modification to this FLIP.
> >>
> >> Based on the feedback that we were receiving after publishing this
> >> document and during Flink Forward, I was growing more and more anxious
> >> about one issue here: having to persist all buffered in-flight data at
> >> once. As the volume of this data might be large (GBs per TaskManager
> even
> >> with small clusters and relatively simple jobs), the time to persist
> all of
> >> this data at once might be quite substantial.
> >>
> >>
> >>
> >> To address this issue, I would like to propose that at first we
> implement
> >> a variant of unaligned checkpoints, just as written down in FLIP-76, but
> >> with continuous spilling - all data will be persisted/spilled
> continuously,
> >> all the time as they come - not at once when the checkpoint starts.
> Think
> >> about this proposal as incremental way of persisting the data.
> >>
> >> Pros of continuous spilling:
> >> + faster checkpointing, as there will be no need to store GBs of data,
> >> just flush/close.
> >> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> loads,
> >> steady records throughput and spilling.
> >>
> >> Cons of continuous spilling:
> >> - need to persist all of the network traffic instead of persisting just
> >> the in-flight data
> >>
> >> Larger volume of persisted data doesn’t matter that much from the
> >> perspective of the throughput, as if you are unable to spill the data
> >> faster than to process them, unaligned checkpoints are worse option
> >> compared to the aligned checkpoints [1]. If checkpoints are frequent it
> >> also doesn’t matter [2]. The true downside is if checkpoints are
> infrequent
> >> and you have to for example pay $ for the extra storage or extra network
> >> traffic to the storage.
> >>
> >> On the other hand, continuous spilling (persistent communication
> >> channels?) might have an added benefit of enabling us localised
> failures -
> >> failure of one node will not necessarily bring down the whole cluster.
> >>
> >>
> >>
> >> As I mentioned, I’m proposing to just start with the continuous
> spilling.
> >> It might be more costly in some scenarios, but it will offer the most
> >> stable and predictable performance with the lowest checkpoint latency.
> It’s
> >> not perfect, it won’t solve all of the use cases, but frankly all of the
> >> other options have their own blind spots, and continuous spilling
> should at
> >> least fully solve relatively low throughput use cases. We can later
> build
> >> on top of that solution, expanding it with the following features:
> >>
> >> 1. Do not spill continuously if there is no backpressure. For example
> >> provide a timeout: start spilling pre-emptively/continuously if some
> buffer
> >> was not processed within X seconds.
> >> 2. Start spilling only once the checkpoint starts (this is the exact
> >> proposal from the current FLIP-76).
> >> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> >> but in the future we are considering other options, for example Apache
> >> Bookeeper.
> >>
> >> What do you think?
> >>
> >> Piotrek
> >>
> >>
> >>
> >> [1] I’m assuming that the spilling throughput per node can go up to
> >> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> >> in-flight data will take 3.3 times longer than waiting for the
> alignment.
> >> On the other hand if data processing rate is 10MB/s, overhead of
> continuous
> >> spilling is relatively low.
> >> [2] With checkpoints every one minute, with data processing throughput
> >> 30MB/s per node, we would have to persist 1.8GB of data per node between
> >> the checkpoints, which is similar order of magnitude as buffered
> in-flight
> >> data under the back-pressure. With higher throughput, unaligned
> checkpoints
> >> are not helping ([1]). With lower throughput, both the original proposal
> >> and continuous spilling would have to effectively persist all of the
> data
> >> anyway.
> >>
> >>> On 10 Oct 2019, at 19:51, Yun Tang  wrote:
> >>>
> >>> Hi Arvid
> >>>
> >>> +1 for this future which has been hoped for a long time. End-to-end
> >> exactly once job could benefit from quicker checkpoint completion.

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-02-27 Thread Piotr Nowojski
l way of persisting the data.
>> 
>> Pros of continuous spilling:
>> + faster checkpointing, as there will be no need to store GBs of data,
>> just flush/close.
>> + more predictable behaviour. Instead of jerky/varying/spike IO/CPU loads,
>> steady records throughput and spilling.
>> 
>> Cons of continuous spilling:
>> - need to persist all of the network traffic instead of persisting just
>> the in-flight data
>> 
>> Larger volume of persisted data doesn’t matter that much from the
>> perspective of the throughput, as if you are unable to spill the data
>> faster than to process them, unaligned checkpoints are worse option
>> compared to the aligned checkpoints [1]. If checkpoints are frequent it
>> also doesn’t matter [2]. The true downside is if checkpoints are infrequent
>> and you have to for example pay $ for the extra storage or extra network
>> traffic to the storage.
>> 
>> On the other hand, continuous spilling (persistent communication
>> channels?) might have an added benefit of enabling us localised failures -
>> failure of one node will not necessarily bring down the whole cluster.
>> 
>> 
>> 
>> As I mentioned, I’m proposing to just start with the continuous spilling.
>> It might be more costly in some scenarios, but it will offer the most
>> stable and predictable performance with the lowest checkpoint latency. It’s
>> not perfect, it won’t solve all of the use cases, but frankly all of the
>> other options have their own blind spots, and continuous spilling should at
>> least fully solve relatively low throughput use cases. We can later build
>> on top of that solution, expanding it with the following features:
>> 
>> 1. Do not spill continuously if there is no backpressure. For example
>> provide a timeout: start spilling pre-emptively/continuously if some buffer
>> was not processed within X seconds.
>> 2. Start spilling only once the checkpoint starts (this is the exact
>> proposal from the current FLIP-76).
>> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
>> but in the future we are considering other options, for example Apache
>> Bookeeper.
>> 
>> What do you think?
>> 
>> Piotrek
>> 
>> 
>> 
>> [1] I’m assuming that the spilling throughput per node can go up to
>> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
>> in-flight data will take 3.3 times longer than waiting for the alignment.
>> On the other hand if data processing rate is 10MB/s, overhead of continuous
>> spilling is relatively low.
>> [2] With checkpoints every one minute, with data processing throughput
>> 30MB/s per node, we would have to persist 1.8GB of data per node between
>> the checkpoints, which is similar order of magnitude as buffered in-flight
>> data under the back-pressure. With higher throughput, unaligned checkpoints
>> are not helping ([1]). With lower throughput, both the original proposal
>> and continuous spilling would have to effectively persist all of the data
>> anyway.
>> 
>>> On 10 Oct 2019, at 19:51, Yun Tang  wrote:
>>> 
>>> Hi Arvid
>>> 
>>> +1 for this future which has been hoped for a long time. End-to-end
>> exactly once job could benefit from quicker checkpoint completion.
>>> 
>>> 
>>> Best
>>> Yun Tang
>>> 
>>> From: Yun Gao 
>>> Sent: Thursday, October 10, 2019 18:39
>>> To: dev 
>>> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
>>> 
>>>   Hi Arvid,
>>> 
>>>   Very thanks for bring up the discussion! From our side unable
>> to finish the checkpoint is commonly met for online jobs, therefore +1 from
>> my side to implement this.
>>>  A tiny issue of the FLIP is that the Discussion Thread URL
>> attached seems to be not right.
>>> 
>>> 
>>>Best,
>>>Yun
>>> 
>>> 
>>> --
>>> From:Arvid Heise 
>>> Send Time:2019 Sep. 30 (Mon.) 20:31
>>> To:dev 
>>> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
>>> 
>>> Hi Devs,
>>> 
>>> I would like to start the formal discussion about FLIP-76 [1], which
>>> improves the checkpoint latency in systems under backpressure, where a
>>> checkpoint can take hours to complete in the worst case. I recommend the
>>> thread "checkpointing under backpressure" [2] to get a g

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-02-26 Thread Zhijiang
offer the most
 > stable and predictable performance with the lowest checkpoint latency. It’s
 > not perfect, it won’t solve all of the use cases, but frankly all of the
 > other options have their own blind spots, and continuous spilling should at
 > least fully solve relatively low throughput use cases. We can later build
 > on top of that solution, expanding it with the following features:
 >
 > 1. Do not spill continuously if there is no backpressure. For example
 > provide a timeout: start spilling pre-emptively/continuously if some buffer
 > was not processed within X seconds.
 > 2. Start spilling only once the checkpoint starts (this is the exact
 > proposal from the current FLIP-76).
 > 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
 > but in the future we are considering other options, for example Apache
 > Bookeeper.
 >
 > What do you think?
 >
 > Piotrek
 >
 >
 >
 > [1] I’m assuming that the spilling throughput per node can go up to
 > ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
 > in-flight data will take 3.3 times longer than waiting for the alignment.
 > On the other hand if data processing rate is 10MB/s, overhead of continuous
 > spilling is relatively low.
 > [2] With checkpoints every one minute, with data processing throughput
 > 30MB/s per node, we would have to persist 1.8GB of data per node between
 > the checkpoints, which is similar order of magnitude as buffered in-flight
 > data under the back-pressure. With higher throughput, unaligned checkpoints
 > are not helping ([1]). With lower throughput, both the original proposal
 > and continuous spilling would have to effectively persist all of the data
 > anyway.
 >
 > > On 10 Oct 2019, at 19:51, Yun Tang  wrote:
 > >
 > > Hi Arvid
 > >
 > > +1 for this future which has been hoped for a long time. End-to-end
 > exactly once job could benefit from quicker checkpoint completion.
 > >
 > >
 > > Best
 > > Yun Tang
 > > 
 > > From: Yun Gao 
 > > Sent: Thursday, October 10, 2019 18:39
 > > To: dev 
 > > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
 > >
 > >Hi Arvid,
 > >
 > >Very thanks for bring up the discussion! From our side unable
 > to finish the checkpoint is commonly met for online jobs, therefore +1 from
 > my side to implement this.
 > >   A tiny issue of the FLIP is that the Discussion Thread URL
 > attached seems to be not right.
 > >
 > >
 > > Best,
 > > Yun
 > >
 > >
 > > --
 > > From:Arvid Heise 
 > > Send Time:2019 Sep. 30 (Mon.) 20:31
 > > To:dev 
 > > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
 > >
 > > Hi Devs,
 > >
 > > I would like to start the formal discussion about FLIP-76 [1], which
 > > improves the checkpoint latency in systems under backpressure, where a
 > > checkpoint can take hours to complete in the worst case. I recommend the
 > > thread "checkpointing under backpressure" [2] to get a good idea why
 > users
 > > are not satisfied with the current behavior. The key points:
 > >
 > >   - Since the checkpoint barrier flows much slower through the
 > >   back-pressured channels, the other channels and their upstream
 > operators
 > >   are effectively blocked during checkpointing.
 > >   - The checkpoint barrier takes a long time to reach the sinks causing
 > >   long checkpointing times. A longer checkpointing time in turn means
 > that
 > >   the checkpoint will be fairly outdated once done. Since a heavily
 > utilized
 > >   pipeline is inherently more fragile, we may run into a vicious cycle of
 > >   late checkpoints, crash, recovery to a rather outdated checkpoint, more
 > >   back pressure, and even later checkpoints, which would result in
 > little to
 > >   no progress in the application.
 > >
 > > The FLIP proposes "unaligned checkpoints" which improves the current
 > state,
 > > such that
 > >
 > >   - Upstream processes can continue to produce data, even if some
 > operator
 > >   still waits on a checkpoint barrier on a specific input channel.
 > >   - Checkpointing times are heavily reduced across the execution graph,
 > >   even for operators with a single input channel.
 > >   - End-users will see more progress even in unstable environments as
 > more
 > >   up-to-date checkpoints will avoid too many recomputations.
 > >   - Facilitate faster rescaling.
 > >
 > > The key idea is to allow checkpoint barriers to be forwarded to
 > downstream
 > > tasks before the synchronous part of the checkpointing has been conducted
 > > (see Fig. 1). To that end, we need to store in-flight data as part of the
 > > checkpoint as described in greater details in this FLIP.
 > >
 > > Although the basic idea was already sketched in [2], we would like get
 > > broader feedback in this dedicated mail thread.
 > >
 > > Best,
 > >
 > > Arvid
 > >
 > > [1]
 > >
 > https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
 > > [2]
 > >
 > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
 > >
 >
 >




Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-02-25 Thread Yu Li
channels, UNALIGNED_WITH_MAX_INFLIGHT_DATA means
> starting checkpoint only when barrier appears in all input channels,
> while UNALIGNED_WITH_UNLIMITED_INFLIGHT_DATA means starting checkpoint when
> barrier appears in any one of the input channels.
>
> 4. It seems now we only support pre-defined options, but is it possible to
> switch in between dynamically? For example, if we predefine the policy to
> ALIGNED, could we supply a command to switch
> to UNALIGNED_WITH_MAX_INFLIGHT_DATA when severe back pressure observed? Or
> switch to ALIGNED if we see too much data persisted for
> UNALIGNED_WITH_MAX_INFLIGHT_DATA? Maybe I'm neglecting something, but
> what's preventing us from being more adaptive?
>
> Thanks!
>
> Best Regards,
> Yu
>
>
> On Tue, 22 Oct 2019 at 15:44, Piotr Nowojski  wrote:
>
> > Hi,
> >
> > I would like to propose a modification to this FLIP.
> >
> > Based on the feedback that we were receiving after publishing this
> > document and during Flink Forward, I was growing more and more anxious
> > about one issue here: having to persist all buffered in-flight data at
> > once. As the volume of this data might be large (GBs per TaskManager even
> > with small clusters and relatively simple jobs), the time to persist all
> of
> > this data at once might be quite substantial.
> >
> >
> >
> > To address this issue, I would like to propose that at first we implement
> > a variant of unaligned checkpoints, just as written down in FLIP-76, but
> > with continuous spilling - all data will be persisted/spilled
> continuously,
> > all the time as they come - not at once when the checkpoint starts. Think
> > about this proposal as incremental way of persisting the data.
> >
> > Pros of continuous spilling:
> > + faster checkpointing, as there will be no need to store GBs of data,
> > just flush/close.
> > + more predictable behaviour. Instead of jerky/varying/spike IO/CPU
> loads,
> > steady records throughput and spilling.
> >
> > Cons of continuous spilling:
> > - need to persist all of the network traffic instead of persisting just
> > the in-flight data
> >
> > Larger volume of persisted data doesn’t matter that much from the
> > perspective of the throughput, as if you are unable to spill the data
> > faster than to process them, unaligned checkpoints are worse option
> > compared to the aligned checkpoints [1]. If checkpoints are frequent it
> > also doesn’t matter [2]. The true downside is if checkpoints are
> infrequent
> > and you have to for example pay $ for the extra storage or extra network
> > traffic to the storage.
> >
> > On the other hand, continuous spilling (persistent communication
> > channels?) might have an added benefit of enabling us localised failures
> -
> > failure of one node will not necessarily bring down the whole cluster.
> >
> >
> >
> > As I mentioned, I’m proposing to just start with the continuous spilling.
> > It might be more costly in some scenarios, but it will offer the most
> > stable and predictable performance with the lowest checkpoint latency.
> It’s
> > not perfect, it won’t solve all of the use cases, but frankly all of the
> > other options have their own blind spots, and continuous spilling should
> at
> > least fully solve relatively low throughput use cases. We can later build
> > on top of that solution, expanding it with the following features:
> >
> > 1. Do not spill continuously if there is no backpressure. For example
> > provide a timeout: start spilling pre-emptively/continuously if some
> buffer
> > was not processed within X seconds.
> > 2. Start spilling only once the checkpoint starts (this is the exact
> > proposal from the current FLIP-76).
> > 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> > but in the future we are considering other options, for example Apache
> > Bookeeper.
> >
> > What do you think?
> >
> > Piotrek
> >
> >
> >
> > [1] I’m assuming that the spilling throughput per node can go up to
> > ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> > in-flight data will take 3.3 times longer than waiting for the alignment.
> > On the other hand if data processing rate is 10MB/s, overhead of
> continuous
> > spilling is relatively low.
> > [2] With checkpoints every one minute, with data processing throughput
> > 30MB/s per node, we would have to persist 1.8GB of data per node between
> > the checkpoints, which is similar order of magnitude as buffered
> in-flight
>

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-02-25 Thread Zhijiang
ersist all of the network traffic instead of persisting just
> the in-flight data
>
> Larger volume of persisted data doesn’t matter that much from the
> perspective of the throughput, as if you are unable to spill the data
> faster than to process them, unaligned checkpoints are worse option
> compared to the aligned checkpoints [1]. If checkpoints are frequent it
> also doesn’t matter [2]. The true downside is if checkpoints are infrequent
> and you have to for example pay $ for the extra storage or extra network
> traffic to the storage.
>
> On the other hand, continuous spilling (persistent communication
> channels?) might have an added benefit of enabling us localised failures -
> failure of one node will not necessarily bring down the whole cluster.
>
>
>
> As I mentioned, I’m proposing to just start with the continuous spilling.
> It might be more costly in some scenarios, but it will offer the most
> stable and predictable performance with the lowest checkpoint latency. It’s
> not perfect, it won’t solve all of the use cases, but frankly all of the
> other options have their own blind spots, and continuous spilling should at
> least fully solve relatively low throughput use cases. We can later build
> on top of that solution, expanding it with the following features:
>
> 1. Do not spill continuously if there is no backpressure. For example
> provide a timeout: start spilling pre-emptively/continuously if some buffer
> was not processed within X seconds.
> 2. Start spilling only once the checkpoint starts (this is the exact
> proposal from the current FLIP-76).
> 3. Initially we want to spill to a Flink’s FileSystem (for example S3),
> but in the future we are considering other options, for example Apache
> Bookeeper.
>
> What do you think?
>
> Piotrek
>
>
>
> [1] I’m assuming that the spilling throughput per node can go up to
> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> in-flight data will take 3.3 times longer than waiting for the alignment.
> On the other hand if data processing rate is 10MB/s, overhead of continuous
> spilling is relatively low.
> [2] With checkpoints every one minute, with data processing throughput
> 30MB/s per node, we would have to persist 1.8GB of data per node between
> the checkpoints, which is similar order of magnitude as buffered in-flight
> data under the back-pressure. With higher throughput, unaligned checkpoints
> are not helping ([1]). With lower throughput, both the original proposal
> and continuous spilling would have to effectively persist all of the data
> anyway.
>
> > On 10 Oct 2019, at 19:51, Yun Tang  wrote:
> >
> > Hi Arvid
> >
> > +1 for this future which has been hoped for a long time. End-to-end
> exactly once job could benefit from quicker checkpoint completion.
> >
> >
> > Best
> > Yun Tang
> > 
> > From: Yun Gao 
> > Sent: Thursday, October 10, 2019 18:39
> > To: dev 
> > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> >Hi Arvid,
> >
> >Very thanks for bring up the discussion! From our side unable
> to finish the checkpoint is commonly met for online jobs, therefore +1 from
> my side to implement this.
> >   A tiny issue of the FLIP is that the Discussion Thread URL
> attached seems to be not right.
> >
> >
> > Best,
> > Yun
> >
> >
> > --
> > From:Arvid Heise 
> > Send Time:2019 Sep. 30 (Mon.) 20:31
> > To:dev 
> > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi Devs,
> >
> > I would like to start the formal discussion about FLIP-76 [1], which
> > improves the checkpoint latency in systems under backpressure, where a
> > checkpoint can take hours to complete in the worst case. I recommend the
> > thread "checkpointing under backpressure" [2] to get a good idea why
> users
> > are not satisfied with the current behavior. The key points:
> >
> >   - Since the checkpoint barrier flows much slower through the
> >   back-pressured channels, the other channels and their upstream
> operators
> >   are effectively blocked during checkpointing.
> >   - The checkpoint barrier takes a long time to reach the sinks causing
> >   long checkpointing times. A longer checkpointing time in turn means
> that
> >   the checkpoint will be fairly outdated once done. Since a heavily
> utilized
> >   pipeline is inherently more fragile, we may run into a vicious cycle of
> >   late checkpoints, crash, recovery to a rather outdated ch

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2020-02-25 Thread Yu Li
go up to
> ~30MB/s. If your Flink's job data processing rate is 100MB/s, spilling
> in-flight data will take 3.3 times longer than waiting for the alignment.
> On the other hand if data processing rate is 10MB/s, overhead of continuous
> spilling is relatively low.
> [2] With checkpoints every one minute, with data processing throughput
> 30MB/s per node, we would have to persist 1.8GB of data per node between
> the checkpoints, which is similar order of magnitude as buffered in-flight
> data under the back-pressure. With higher throughput, unaligned checkpoints
> are not helping ([1]). With lower throughput, both the original proposal
> and continuous spilling would have to effectively persist all of the data
> anyway.
>
> > On 10 Oct 2019, at 19:51, Yun Tang  wrote:
> >
> > Hi Arvid
> >
> > +1 for this future which has been hoped for a long time. End-to-end
> exactly once job could benefit from quicker checkpoint completion.
> >
> >
> > Best
> > Yun Tang
> > 
> > From: Yun Gao 
> > Sent: Thursday, October 10, 2019 18:39
> > To: dev 
> > Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> >
> >Hi Arvid,
> >
> >Very thanks for bring up the discussion! From our side unable
> to finish the checkpoint is commonly met for online jobs, therefore +1 from
> my side to implement this.
> >   A tiny issue of the FLIP is that the Discussion Thread URL
> attached seems to be not right.
> >
> >
> > Best,
> > Yun
> >
> >
> > --
> > From:Arvid Heise 
> > Send Time:2019 Sep. 30 (Mon.) 20:31
> > To:dev 
> > Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> >
> > Hi Devs,
> >
> > I would like to start the formal discussion about FLIP-76 [1], which
> > improves the checkpoint latency in systems under backpressure, where a
> > checkpoint can take hours to complete in the worst case. I recommend the
> > thread "checkpointing under backpressure" [2] to get a good idea why
> users
> > are not satisfied with the current behavior. The key points:
> >
> >   - Since the checkpoint barrier flows much slower through the
> >   back-pressured channels, the other channels and their upstream
> operators
> >   are effectively blocked during checkpointing.
> >   - The checkpoint barrier takes a long time to reach the sinks causing
> >   long checkpointing times. A longer checkpointing time in turn means
> that
> >   the checkpoint will be fairly outdated once done. Since a heavily
> utilized
> >   pipeline is inherently more fragile, we may run into a vicious cycle of
> >   late checkpoints, crash, recovery to a rather outdated checkpoint, more
> >   back pressure, and even later checkpoints, which would result in
> little to
> >   no progress in the application.
> >
> > The FLIP proposes "unaligned checkpoints" which improves the current
> state,
> > such that
> >
> >   - Upstream processes can continue to produce data, even if some
> operator
> >   still waits on a checkpoint barrier on a specific input channel.
> >   - Checkpointing times are heavily reduced across the execution graph,
> >   even for operators with a single input channel.
> >   - End-users will see more progress even in unstable environments as
> more
> >   up-to-date checkpoints will avoid too many recomputations.
> >   - Facilitate faster rescaling.
> >
> > The key idea is to allow checkpoint barriers to be forwarded to
> downstream
> > tasks before the synchronous part of the checkpointing has been conducted
> > (see Fig. 1). To that end, we need to store in-flight data as part of the
> > checkpoint as described in greater details in this FLIP.
> >
> > Although the basic idea was already sketched in [2], we would like get
> > broader feedback in this dedicated mail thread.
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
> >
>
>


Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2019-10-22 Thread Piotr Nowojski
Hi,

I would like to propose a modification to this FLIP. 

Based on the feedback that we were receiving after publishing this document and 
during Flink Forward, I was growing more and more anxious about one issue here: 
having to persist all buffered in-flight data at once. As the volume of this 
data might be large (GBs per TaskManager even with small clusters and 
relatively simple jobs), the time to persist all of this data at once might be 
quite substantial.



To address this issue, I would like to propose that at first we implement a 
variant of unaligned checkpoints, just as written down in FLIP-76, but with 
continuous spilling - all data will be persisted/spilled continuously, all the 
time as they come - not at once when the checkpoint starts. Think about this 
proposal as incremental way of persisting the data.

Pros of continuous spilling:
+ faster checkpointing, as there will be no need to store GBs of data, just 
flush/close.
+ more predictable behaviour. Instead of jerky/varying/spike IO/CPU loads, 
steady records throughput and spilling.

Cons of continuous spilling:
- need to persist all of the network traffic instead of persisting just the 
in-flight data 

Larger volume of persisted data doesn’t matter that much from the perspective 
of the throughput, as if you are unable to spill the data faster than to 
process them, unaligned checkpoints are worse option compared to the aligned 
checkpoints [1]. If checkpoints are frequent it also doesn’t matter [2]. The 
true downside is if checkpoints are infrequent and you have to for example pay 
$ for the extra storage or extra network traffic to the storage.   

On the other hand, continuous spilling (persistent communication channels?) 
might have an added benefit of enabling us localised failures - failure of one 
node will not necessarily bring down the whole cluster.



As I mentioned, I’m proposing to just start with the continuous spilling. It 
might be more costly in some scenarios, but it will offer the most stable and 
predictable performance with the lowest checkpoint latency. It’s not perfect, 
it won’t solve all of the use cases, but frankly all of the other options have 
their own blind spots, and continuous spilling should at least fully solve 
relatively low throughput use cases. We can later build on top of that 
solution, expanding it with the following features:

1. Do not spill continuously if there is no backpressure. For example provide a 
timeout: start spilling pre-emptively/continuously if some buffer was not 
processed within X seconds.
2. Start spilling only once the checkpoint starts (this is the exact proposal 
from the current FLIP-76).
3. Initially we want to spill to a Flink’s FileSystem (for example S3), but in 
the future we are considering other options, for example Apache Bookeeper. 

What do you think?

Piotrek



[1] I’m assuming that the spilling throughput per node can go up to ~30MB/s. If 
your Flink's job data processing rate is 100MB/s, spilling in-flight data will 
take 3.3 times longer than waiting for the alignment. On the other hand if data 
processing rate is 10MB/s, overhead of continuous spilling is relatively low.
[2] With checkpoints every one minute, with data processing throughput 30MB/s 
per node, we would have to persist 1.8GB of data per node between the 
checkpoints, which is similar order of magnitude as buffered in-flight data 
under the back-pressure. With higher throughput, unaligned checkpoints are not 
helping ([1]). With lower throughput, both the original proposal and continuous 
spilling would have to effectively persist all of the data anyway. 

> On 10 Oct 2019, at 19:51, Yun Tang  wrote:
> 
> Hi Arvid
> 
> +1 for this future which has been hoped for a long time. End-to-end exactly 
> once job could benefit from quicker checkpoint completion.
> 
> 
> Best
> Yun Tang
> 
> From: Yun Gao 
> Sent: Thursday, October 10, 2019 18:39
> To: dev 
> Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints
> 
>Hi Arvid,
> 
>Very thanks for bring up the discussion! From our side unable to 
> finish the checkpoint is commonly met for online jobs, therefore +1 from my 
> side to implement this.
>   A tiny issue of the FLIP is that the Discussion Thread URL attached 
> seems to be not right.
> 
> 
> Best,
> Yun
> 
> 
> --
> From:Arvid Heise 
> Send Time:2019 Sep. 30 (Mon.) 20:31
> To:dev 
> Subject:[DISCUSS] FLIP-76: Unaligned checkpoints
> 
> Hi Devs,
> 
> I would like to start the formal discussion about FLIP-76 [1], which
> improves the checkpoint latency in systems under backpressure, where a
> checkpoint can take hours to complete in the worst case. I recommend the
> thread "checkpointing under backpressure" [2] to get a good 

Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2019-10-10 Thread Yun Tang
Hi Arvid

+1 for this future which has been hoped for a long time. End-to-end exactly 
once job could benefit from quicker checkpoint completion.


Best
Yun Tang

From: Yun Gao 
Sent: Thursday, October 10, 2019 18:39
To: dev 
Subject: Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Hi Arvid,

Very thanks for bring up the discussion! From our side unable to 
finish the checkpoint is commonly met for online jobs, therefore +1 from my 
side to implement this.
   A tiny issue of the FLIP is that the Discussion Thread URL attached 
seems to be not right.


 Best,
 Yun


--
From:Arvid Heise 
Send Time:2019 Sep. 30 (Mon.) 20:31
To:dev 
Subject:[DISCUSS] FLIP-76: Unaligned checkpoints

Hi Devs,

I would like to start the formal discussion about FLIP-76 [1], which
improves the checkpoint latency in systems under backpressure, where a
checkpoint can take hours to complete in the worst case. I recommend the
thread "checkpointing under backpressure" [2] to get a good idea why users
are not satisfied with the current behavior. The key points:

   - Since the checkpoint barrier flows much slower through the
   back-pressured channels, the other channels and their upstream operators
   are effectively blocked during checkpointing.
   - The checkpoint barrier takes a long time to reach the sinks causing
   long checkpointing times. A longer checkpointing time in turn means that
   the checkpoint will be fairly outdated once done. Since a heavily utilized
   pipeline is inherently more fragile, we may run into a vicious cycle of
   late checkpoints, crash, recovery to a rather outdated checkpoint, more
   back pressure, and even later checkpoints, which would result in little to
   no progress in the application.

The FLIP proposes "unaligned checkpoints" which improves the current state,
such that

   - Upstream processes can continue to produce data, even if some operator
   still waits on a checkpoint barrier on a specific input channel.
   - Checkpointing times are heavily reduced across the execution graph,
   even for operators with a single input channel.
   - End-users will see more progress even in unstable environments as more
   up-to-date checkpoints will avoid too many recomputations.
   - Facilitate faster rescaling.

The key idea is to allow checkpoint barriers to be forwarded to downstream
tasks before the synchronous part of the checkpointing has been conducted
(see Fig. 1). To that end, we need to store in-flight data as part of the
checkpoint as described in greater details in this FLIP.

Although the basic idea was already sketched in [2], we would like get
broader feedback in this dedicated mail thread.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html



Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2019-10-10 Thread Yun Gao
Hi Arvid,

Very thanks for bring up the discussion! From our side unable to 
finish the checkpoint is commonly met for online jobs, therefore +1 from my 
side to implement this. 
   A tiny issue of the FLIP is that the Discussion Thread URL attached 
seems to be not right. 


 Best, 
 Yun 


--
From:Arvid Heise 
Send Time:2019 Sep. 30 (Mon.) 20:31
To:dev 
Subject:[DISCUSS] FLIP-76: Unaligned checkpoints

Hi Devs,

I would like to start the formal discussion about FLIP-76 [1], which
improves the checkpoint latency in systems under backpressure, where a
checkpoint can take hours to complete in the worst case. I recommend the
thread "checkpointing under backpressure" [2] to get a good idea why users
are not satisfied with the current behavior. The key points:

   - Since the checkpoint barrier flows much slower through the
   back-pressured channels, the other channels and their upstream operators
   are effectively blocked during checkpointing.
   - The checkpoint barrier takes a long time to reach the sinks causing
   long checkpointing times. A longer checkpointing time in turn means that
   the checkpoint will be fairly outdated once done. Since a heavily utilized
   pipeline is inherently more fragile, we may run into a vicious cycle of
   late checkpoints, crash, recovery to a rather outdated checkpoint, more
   back pressure, and even later checkpoints, which would result in little to
   no progress in the application.

The FLIP proposes "unaligned checkpoints" which improves the current state,
such that

   - Upstream processes can continue to produce data, even if some operator
   still waits on a checkpoint barrier on a specific input channel.
   - Checkpointing times are heavily reduced across the execution graph,
   even for operators with a single input channel.
   - End-users will see more progress even in unstable environments as more
   up-to-date checkpoints will avoid too many recomputations.
   - Facilitate faster rescaling.

The key idea is to allow checkpoint barriers to be forwarded to downstream
tasks before the synchronous part of the checkpointing has been conducted
(see Fig. 1). To that end, we need to store in-flight data as part of the
checkpoint as described in greater details in this FLIP.

Although the basic idea was already sketched in [2], we would like get
broader feedback in this dedicated mail thread.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html



Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2019-10-09 Thread Congxian Qiu
Thanks for the FLIP, Arvid.

This is a good improvement for checkpoint under backpressure. Currently, if
a job under backpressure, it almost can't complete the checkpoint. so +1
from my side.

Best,
Congxian


zhijiang  于2019年10月10日周四 上午11:02写道:

> Thanks for writing up this FLIP, Arvid!
>
> Many users would expect this feature and also +1 from my side.
>
> Best,
> Zhijiang
> --
> From:Piotr Nowojski 
> Send Time:2019年10月7日(星期一) 10:13
> To:dev 
> Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints
>
> Hi Arvid,
>
> Thanks for coming up with this FLIP. I think it addresses the issues
> raised in the previous mailing list discussion [2].
>
> For the record: +1 from my side to implement this.
>
> Piotrek
>
> > On 30 Sep 2019, at 14:31, Arvid Heise  wrote:
> >
> > Hi Devs,
> >
> > I would like to start the formal discussion about FLIP-76 [1], which
> > improves the checkpoint latency in systems under backpressure, where a
> > checkpoint can take hours to complete in the worst case. I recommend the
> > thread "checkpointing under backpressure" [2] to get a good idea why
> users
> > are not satisfied with the current behavior. The key points:
> >
> >   - Since the checkpoint barrier flows much slower through the
> >   back-pressured channels, the other channels and their upstream
> operators
> >   are effectively blocked during checkpointing.
> >   - The checkpoint barrier takes a long time to reach the sinks causing
> >   long checkpointing times. A longer checkpointing time in turn means
> that
> >   the checkpoint will be fairly outdated once done. Since a heavily
> utilized
> >   pipeline is inherently more fragile, we may run into a vicious cycle of
> >   late checkpoints, crash, recovery to a rather outdated checkpoint, more
> >   back pressure, and even later checkpoints, which would result in
> little to
> >   no progress in the application.
> >
> > The FLIP proposes "unaligned checkpoints" which improves the current
> state,
> > such that
> >
> >   - Upstream processes can continue to produce data, even if some
> operator
> >   still waits on a checkpoint barrier on a specific input channel.
> >   - Checkpointing times are heavily reduced across the execution graph,
> >   even for operators with a single input channel.
> >   - End-users will see more progress even in unstable environments as
> more
> >   up-to-date checkpoints will avoid too many recomputations.
> >   - Facilitate faster rescaling.
> >
> > The key idea is to allow checkpoint barriers to be forwarded to
> downstream
> > tasks before the synchronous part of the checkpointing has been conducted
> > (see Fig. 1). To that end, we need to store in-flight data as part of the
> > checkpoint as described in greater details in this FLIP.
> >
> > Although the basic idea was already sketched in [2], we would like get
> > broader feedback in this dedicated mail thread.
> >
> > Best,
> >
> > Arvid
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > [2]
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html
>
>


Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2019-10-09 Thread zhijiang
Thanks for writing up this FLIP, Arvid! 

Many users would expect this feature and also +1 from my side.

Best,
Zhijiang
--
From:Piotr Nowojski 
Send Time:2019年10月7日(星期一) 10:13
To:dev 
Subject:Re: [DISCUSS] FLIP-76: Unaligned checkpoints

Hi Arvid,

Thanks for coming up with this FLIP. I think it addresses the issues raised in 
the previous mailing list discussion [2]. 

For the record: +1 from my side to implement this.

Piotrek

> On 30 Sep 2019, at 14:31, Arvid Heise  wrote:
> 
> Hi Devs,
> 
> I would like to start the formal discussion about FLIP-76 [1], which
> improves the checkpoint latency in systems under backpressure, where a
> checkpoint can take hours to complete in the worst case. I recommend the
> thread "checkpointing under backpressure" [2] to get a good idea why users
> are not satisfied with the current behavior. The key points:
> 
>   - Since the checkpoint barrier flows much slower through the
>   back-pressured channels, the other channels and their upstream operators
>   are effectively blocked during checkpointing.
>   - The checkpoint barrier takes a long time to reach the sinks causing
>   long checkpointing times. A longer checkpointing time in turn means that
>   the checkpoint will be fairly outdated once done. Since a heavily utilized
>   pipeline is inherently more fragile, we may run into a vicious cycle of
>   late checkpoints, crash, recovery to a rather outdated checkpoint, more
>   back pressure, and even later checkpoints, which would result in little to
>   no progress in the application.
> 
> The FLIP proposes "unaligned checkpoints" which improves the current state,
> such that
> 
>   - Upstream processes can continue to produce data, even if some operator
>   still waits on a checkpoint barrier on a specific input channel.
>   - Checkpointing times are heavily reduced across the execution graph,
>   even for operators with a single input channel.
>   - End-users will see more progress even in unstable environments as more
>   up-to-date checkpoints will avoid too many recomputations.
>   - Facilitate faster rescaling.
> 
> The key idea is to allow checkpoint barriers to be forwarded to downstream
> tasks before the synchronous part of the checkpointing has been conducted
> (see Fig. 1). To that end, we need to store in-flight data as part of the
> checkpoint as described in greater details in this FLIP.
> 
> Although the basic idea was already sketched in [2], we would like get
> broader feedback in this dedicated mail thread.
> 
> Best,
> 
> Arvid
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html



Re: [DISCUSS] FLIP-76: Unaligned checkpoints

2019-10-07 Thread Piotr Nowojski
Hi Arvid,

Thanks for coming up with this FLIP. I think it addresses the issues raised in 
the previous mailing list discussion [2]. 

For the record: +1 from my side to implement this.

Piotrek

> On 30 Sep 2019, at 14:31, Arvid Heise  wrote:
> 
> Hi Devs,
> 
> I would like to start the formal discussion about FLIP-76 [1], which
> improves the checkpoint latency in systems under backpressure, where a
> checkpoint can take hours to complete in the worst case. I recommend the
> thread "checkpointing under backpressure" [2] to get a good idea why users
> are not satisfied with the current behavior. The key points:
> 
>   - Since the checkpoint barrier flows much slower through the
>   back-pressured channels, the other channels and their upstream operators
>   are effectively blocked during checkpointing.
>   - The checkpoint barrier takes a long time to reach the sinks causing
>   long checkpointing times. A longer checkpointing time in turn means that
>   the checkpoint will be fairly outdated once done. Since a heavily utilized
>   pipeline is inherently more fragile, we may run into a vicious cycle of
>   late checkpoints, crash, recovery to a rather outdated checkpoint, more
>   back pressure, and even later checkpoints, which would result in little to
>   no progress in the application.
> 
> The FLIP proposes "unaligned checkpoints" which improves the current state,
> such that
> 
>   - Upstream processes can continue to produce data, even if some operator
>   still waits on a checkpoint barrier on a specific input channel.
>   - Checkpointing times are heavily reduced across the execution graph,
>   even for operators with a single input channel.
>   - End-users will see more progress even in unstable environments as more
>   up-to-date checkpoints will avoid too many recomputations.
>   - Facilitate faster rescaling.
> 
> The key idea is to allow checkpoint barriers to be forwarded to downstream
> tasks before the synchronous part of the checkpointing has been conducted
> (see Fig. 1). To that end, we need to store in-flight data as part of the
> checkpoint as described in greater details in this FLIP.
> 
> Although the basic idea was already sketched in [2], we would like get
> broader feedback in this dedicated mail thread.
> 
> Best,
> 
> Arvid
> 
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Checkpointing-under-backpressure-td31616.html