Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-08-03 Thread Piotr Nowojski
Hi Devs,

In some discussions that popped up during reviewing the code, we decided to
rename this effort from clumsy "Dynamic buffer size adjustment" or
"Automatic in-flight data control", to "Buffer debloat". First of all,
bufferbloat is already an established name for this problem that we are
trying to solve [1], albeit it's used mostly on much lower network stack
layers. Buffer debloating is also an established name for efforts to solve
bufferbloat problem [2]. Secondly, it's just a more catchy name that can be
more easily advertised :) Hence "bufferbloat", "buffer debloating" would be
the terminology that we will be using in the code, the config options, the
documentation and potential blog posts.

Please let us know if you think there is an even better name for this
effort, as we have a time until the 1.14 release to rename it.

Best, Piotrek

[1] https://en.wikipedia.org/wiki/Bufferbloat
[2] https://www.google.com/search?q=buffer+%22debloat%22

śr., 21 lip 2021 o 13:24 Anton Kalashnikov  napisał(a):

> Thanks everyone for sharing your opinion. I updated the FLIP according
> to discussion and I'm going to start the vote on this FLIP
>
> --
> Best regards,
> Anton Kalashnikov
>
> 16.07.2021 09:23, Till Rohrmann пишет:
> > I think this is a good idea. +1 for this approach. Are you gonna update
> the
> > FLIP accordingly?
> >
> > Cheers,
> > Till
> >
> > On Thu, Jul 15, 2021 at 9:33 PM Steven Wu  wrote:
> >
> >> I really like the new idea.
> >>
> >> On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski 
> >> wrote:
> >>
> >>> Hi Till,
> >>>
>    I assume that buffer sizes are only
>  changed for newly assigned buffers/credits, right? Otherwise, the data
>  could already be on the wire and then it wouldn't fit on the receiver
> >>> side.
>  Or do we have a back channel mechanism to tell the sender that a part
> >> of
> >>> a
>  buffer needs to be resent once more capacity is available?
> >>> Initially our implementation proposal was intending to implement the
> >> first
> >>> option. Buffer size would be attached to a credit message, so first
> >>> received would need to allocate a buffer with the updated size, send
> the
> >>> credit upstream, and sender would be allowed to only send as much data
> as
> >>> in the credit. So there would be no way and no problem with changing
> >> buffer
> >>> sizes while something is "on the wire".
> >>>
> >>> However Anton suggested an even simpler idea to me today. There is
> >> actually
> >>> no problem with receivers supporting all buffer sizes up to the maximum
> >>> allowed size (current configured memory segment size). Thus new buffer
> >> size
> >>> can be treated as a recommendation by the sender. We can announce a new
> >>> buffer size, and the sender will start capping the newly requested
> buffer
> >>> to that size, but we can still send already filled buffers in chunks
> with
> >>> any size, as long as it's below max memory segment size. In this way we
> >> can
> >>> leave any already filled in buffers on the sender side untouched and we
> >> do
> >>> not need to partition/slice them before sending them down, making at
> >> least
> >>> the initial version even simpler. This way we also do not need to
> >>> differentiate that different credits have different sizes. We just
> >> announce
> >>> a single value "recommended/requested buffer size".
> >>>
> >>> Piotrek
> >>>
> >>> czw., 15 lip 2021 o 17:27 Till Rohrmann 
> >> napisał(a):
>  Hi everyone,
> 
>  Thanks a lot for creating this FLIP Anton and Piotr. I think it looks
> >>> like
>  a very promising solution for speeding up our checkpoints and being
> >> able
> >>> to
>  create them more reliably.
> 
>  Following up on Steven's question: I assume that buffer sizes are only
>  changed for newly assigned buffers/credits, right? Otherwise, the data
>  could already be on the wire and then it wouldn't fit on the receiver
> >>> side.
>  Or do we have a back channel mechanism to tell the sender that a part
> >> of
> >>> a
>  buffer needs to be resent once more capacity is available?
> 
>  Cheers,
>  Till
> 
>  On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski  >
>  wrote:
> 
> > Hi Steven,
> >
> > As downstream/upstream nodes are decoupled, if downstream nodes
> >> adjust
> > first it's buffer size first, there will be a lag until this updated
>  buffer
> > size information reaches the upstream node.. It is a problem, but it
> >>> has
>  a
> > quite simple solution that we described in the FLIP document:
> >
> >> Sending the buffer of the right size.
> >> It is not enough to know just the number of available buffers
> >>> (credits)
> > for the downstream because the size of these buffers can be
> >> different.
> >> So we are proposing to resolve this problem in the following way:
> >> If
>  the
> > downstream buffer size is changed then the upstream should send
> >> 

Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-21 Thread Anton Kalashnikov
Thanks everyone for sharing your opinion. I updated the FLIP according 
to discussion and I'm going to start the vote on this FLIP


--
Best regards,
Anton Kalashnikov

16.07.2021 09:23, Till Rohrmann пишет:

I think this is a good idea. +1 for this approach. Are you gonna update the
FLIP accordingly?

Cheers,
Till

On Thu, Jul 15, 2021 at 9:33 PM Steven Wu  wrote:


I really like the new idea.

On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski 
wrote:


Hi Till,


  I assume that buffer sizes are only
changed for newly assigned buffers/credits, right? Otherwise, the data
could already be on the wire and then it wouldn't fit on the receiver

side.

Or do we have a back channel mechanism to tell the sender that a part

of

a

buffer needs to be resent once more capacity is available?

Initially our implementation proposal was intending to implement the

first

option. Buffer size would be attached to a credit message, so first
received would need to allocate a buffer with the updated size, send the
credit upstream, and sender would be allowed to only send as much data as
in the credit. So there would be no way and no problem with changing

buffer

sizes while something is "on the wire".

However Anton suggested an even simpler idea to me today. There is

actually

no problem with receivers supporting all buffer sizes up to the maximum
allowed size (current configured memory segment size). Thus new buffer

size

can be treated as a recommendation by the sender. We can announce a new
buffer size, and the sender will start capping the newly requested buffer
to that size, but we can still send already filled buffers in chunks with
any size, as long as it's below max memory segment size. In this way we

can

leave any already filled in buffers on the sender side untouched and we

do

not need to partition/slice them before sending them down, making at

least

the initial version even simpler. This way we also do not need to
differentiate that different credits have different sizes. We just

announce

a single value "recommended/requested buffer size".

Piotrek

czw., 15 lip 2021 o 17:27 Till Rohrmann 

napisał(a):

Hi everyone,

Thanks a lot for creating this FLIP Anton and Piotr. I think it looks

like

a very promising solution for speeding up our checkpoints and being

able

to

create them more reliably.

Following up on Steven's question: I assume that buffer sizes are only
changed for newly assigned buffers/credits, right? Otherwise, the data
could already be on the wire and then it wouldn't fit on the receiver

side.

Or do we have a back channel mechanism to tell the sender that a part

of

a

buffer needs to be resent once more capacity is available?

Cheers,
Till

On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
wrote:


Hi Steven,

As downstream/upstream nodes are decoupled, if downstream nodes

adjust

first it's buffer size first, there will be a lag until this updated

buffer

size information reaches the upstream node.. It is a problem, but it

has

a

quite simple solution that we described in the FLIP document:


Sending the buffer of the right size.
It is not enough to know just the number of available buffers

(credits)

for the downstream because the size of these buffers can be

different.

So we are proposing to resolve this problem in the following way:

If

the

downstream buffer size is changed then the upstream should send

the buffer of the size not greater than the new one regardless of

how

big

the current buffer on the upstream. (pollBuffer should receive

parameters like bufferSize and return buffer not greater than it)

So apart from adding buffer size information to the `AddCredit`

message,

we

will need to support a case where upstream subpartition has already
produced a buffer with older size (for example 32KB), while the next

credit

arrives with an allowance for a smaller size (16KB). In that case, we

are

only allowed to send a portion of the data from this buffer that fits

into

the new updated buffer size, and keep announcing the remaining part

as

available backlog.

Best,
Piotrek


śr., 14 lip 2021 o 08:33 Steven Wu 

napisał(a):

- The subtask observes the changes in the throughput and changes

the

buffer size during the whole life period of the task.
- The subtask sends buffer size and number of available buffers

to

the

upstream to the corresponding subpartition.
- Upstream changes the buffer size corresponding to the received
information.
- Upstream sends the data and number of filled buffers to the

downstream


Will the above steps of buffer size adjustment cause problems with
credit-based flow control (mainly for downsizing), since downstream
adjust down first?

Here is the quote from the blog[1]
"Credit-based flow control makes sure that whatever is “on the

wire”

will

have capacity at the receiver to handle. "


[1]



https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control


On Tue, Jul 13, 

Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-16 Thread Till Rohrmann
I think this is a good idea. +1 for this approach. Are you gonna update the
FLIP accordingly?

Cheers,
Till

On Thu, Jul 15, 2021 at 9:33 PM Steven Wu  wrote:

> I really like the new idea.
>
> On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski 
> wrote:
>
> > Hi Till,
> >
> > >  I assume that buffer sizes are only
> > > changed for newly assigned buffers/credits, right? Otherwise, the data
> > > could already be on the wire and then it wouldn't fit on the receiver
> > side.
> > > Or do we have a back channel mechanism to tell the sender that a part
> of
> > a
> > > buffer needs to be resent once more capacity is available?
> >
> > Initially our implementation proposal was intending to implement the
> first
> > option. Buffer size would be attached to a credit message, so first
> > received would need to allocate a buffer with the updated size, send the
> > credit upstream, and sender would be allowed to only send as much data as
> > in the credit. So there would be no way and no problem with changing
> buffer
> > sizes while something is "on the wire".
> >
> > However Anton suggested an even simpler idea to me today. There is
> actually
> > no problem with receivers supporting all buffer sizes up to the maximum
> > allowed size (current configured memory segment size). Thus new buffer
> size
> > can be treated as a recommendation by the sender. We can announce a new
> > buffer size, and the sender will start capping the newly requested buffer
> > to that size, but we can still send already filled buffers in chunks with
> > any size, as long as it's below max memory segment size. In this way we
> can
> > leave any already filled in buffers on the sender side untouched and we
> do
> > not need to partition/slice them before sending them down, making at
> least
> > the initial version even simpler. This way we also do not need to
> > differentiate that different credits have different sizes. We just
> announce
> > a single value "recommended/requested buffer size".
> >
> > Piotrek
> >
> > czw., 15 lip 2021 o 17:27 Till Rohrmann 
> napisał(a):
> >
> > > Hi everyone,
> > >
> > > Thanks a lot for creating this FLIP Anton and Piotr. I think it looks
> > like
> > > a very promising solution for speeding up our checkpoints and being
> able
> > to
> > > create them more reliably.
> > >
> > > Following up on Steven's question: I assume that buffer sizes are only
> > > changed for newly assigned buffers/credits, right? Otherwise, the data
> > > could already be on the wire and then it wouldn't fit on the receiver
> > side.
> > > Or do we have a back channel mechanism to tell the sender that a part
> of
> > a
> > > buffer needs to be resent once more capacity is available?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Hi Steven,
> > > >
> > > > As downstream/upstream nodes are decoupled, if downstream nodes
> adjust
> > > > first it's buffer size first, there will be a lag until this updated
> > > buffer
> > > > size information reaches the upstream node.. It is a problem, but it
> > has
> > > a
> > > > quite simple solution that we described in the FLIP document:
> > > >
> > > > > Sending the buffer of the right size.
> > > > > It is not enough to know just the number of available buffers
> > (credits)
> > > > for the downstream because the size of these buffers can be
> different.
> > > > > So we are proposing to resolve this problem in the following way:
> If
> > > the
> > > > downstream buffer size is changed then the upstream should send
> > > > > the buffer of the size not greater than the new one regardless of
> how
> > > big
> > > > the current buffer on the upstream. (pollBuffer should receive
> > > > > parameters like bufferSize and return buffer not greater than it)
> > > >
> > > > So apart from adding buffer size information to the `AddCredit`
> > message,
> > > we
> > > > will need to support a case where upstream subpartition has already
> > > > produced a buffer with older size (for example 32KB), while the next
> > > credit
> > > > arrives with an allowance for a smaller size (16KB). In that case, we
> > are
> > > > only allowed to send a portion of the data from this buffer that fits
> > > into
> > > > the new updated buffer size, and keep announcing the remaining part
> as
> > > > available backlog.
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > >
> > > > śr., 14 lip 2021 o 08:33 Steven Wu 
> napisał(a):
> > > >
> > > > >- The subtask observes the changes in the throughput and changes
> > the
> > > > >buffer size during the whole life period of the task.
> > > > >- The subtask sends buffer size and number of available buffers
> to
> > > the
> > > > >upstream to the corresponding subpartition.
> > > > >- Upstream changes the buffer size corresponding to the received
> > > > >information.
> > > > >- Upstream sends the data and number of filled buffers to the
> > > > downstream
> > > > >
> > > > >
> > 

Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-15 Thread Steven Wu
I really like the new idea.

On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski 
wrote:

> Hi Till,
>
> >  I assume that buffer sizes are only
> > changed for newly assigned buffers/credits, right? Otherwise, the data
> > could already be on the wire and then it wouldn't fit on the receiver
> side.
> > Or do we have a back channel mechanism to tell the sender that a part of
> a
> > buffer needs to be resent once more capacity is available?
>
> Initially our implementation proposal was intending to implement the first
> option. Buffer size would be attached to a credit message, so first
> received would need to allocate a buffer with the updated size, send the
> credit upstream, and sender would be allowed to only send as much data as
> in the credit. So there would be no way and no problem with changing buffer
> sizes while something is "on the wire".
>
> However Anton suggested an even simpler idea to me today. There is actually
> no problem with receivers supporting all buffer sizes up to the maximum
> allowed size (current configured memory segment size). Thus new buffer size
> can be treated as a recommendation by the sender. We can announce a new
> buffer size, and the sender will start capping the newly requested buffer
> to that size, but we can still send already filled buffers in chunks with
> any size, as long as it's below max memory segment size. In this way we can
> leave any already filled in buffers on the sender side untouched and we do
> not need to partition/slice them before sending them down, making at least
> the initial version even simpler. This way we also do not need to
> differentiate that different credits have different sizes. We just announce
> a single value "recommended/requested buffer size".
>
> Piotrek
>
> czw., 15 lip 2021 o 17:27 Till Rohrmann  napisał(a):
>
> > Hi everyone,
> >
> > Thanks a lot for creating this FLIP Anton and Piotr. I think it looks
> like
> > a very promising solution for speeding up our checkpoints and being able
> to
> > create them more reliably.
> >
> > Following up on Steven's question: I assume that buffer sizes are only
> > changed for newly assigned buffers/credits, right? Otherwise, the data
> > could already be on the wire and then it wouldn't fit on the receiver
> side.
> > Or do we have a back channel mechanism to tell the sender that a part of
> a
> > buffer needs to be resent once more capacity is available?
> >
> > Cheers,
> > Till
> >
> > On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
> > wrote:
> >
> > > Hi Steven,
> > >
> > > As downstream/upstream nodes are decoupled, if downstream nodes adjust
> > > first it's buffer size first, there will be a lag until this updated
> > buffer
> > > size information reaches the upstream node.. It is a problem, but it
> has
> > a
> > > quite simple solution that we described in the FLIP document:
> > >
> > > > Sending the buffer of the right size.
> > > > It is not enough to know just the number of available buffers
> (credits)
> > > for the downstream because the size of these buffers can be different.
> > > > So we are proposing to resolve this problem in the following way: If
> > the
> > > downstream buffer size is changed then the upstream should send
> > > > the buffer of the size not greater than the new one regardless of how
> > big
> > > the current buffer on the upstream. (pollBuffer should receive
> > > > parameters like bufferSize and return buffer not greater than it)
> > >
> > > So apart from adding buffer size information to the `AddCredit`
> message,
> > we
> > > will need to support a case where upstream subpartition has already
> > > produced a buffer with older size (for example 32KB), while the next
> > credit
> > > arrives with an allowance for a smaller size (16KB). In that case, we
> are
> > > only allowed to send a portion of the data from this buffer that fits
> > into
> > > the new updated buffer size, and keep announcing the remaining part as
> > > available backlog.
> > >
> > > Best,
> > > Piotrek
> > >
> > >
> > > śr., 14 lip 2021 o 08:33 Steven Wu  napisał(a):
> > >
> > > >- The subtask observes the changes in the throughput and changes
> the
> > > >buffer size during the whole life period of the task.
> > > >- The subtask sends buffer size and number of available buffers to
> > the
> > > >upstream to the corresponding subpartition.
> > > >- Upstream changes the buffer size corresponding to the received
> > > >information.
> > > >- Upstream sends the data and number of filled buffers to the
> > > downstream
> > > >
> > > >
> > > > Will the above steps of buffer size adjustment cause problems with
> > > > credit-based flow control (mainly for downsizing), since downstream
> > > > adjust down first?
> > > >
> > > > Here is the quote from the blog[1]
> > > > "Credit-based flow control makes sure that whatever is “on the wire”
> > will
> > > > have capacity at the receiver to handle. "
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> 

Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-15 Thread Piotr Nowojski
Hi Till,

>  I assume that buffer sizes are only
> changed for newly assigned buffers/credits, right? Otherwise, the data
> could already be on the wire and then it wouldn't fit on the receiver
side.
> Or do we have a back channel mechanism to tell the sender that a part of a
> buffer needs to be resent once more capacity is available?

Initially our implementation proposal was intending to implement the first
option. Buffer size would be attached to a credit message, so first
received would need to allocate a buffer with the updated size, send the
credit upstream, and sender would be allowed to only send as much data as
in the credit. So there would be no way and no problem with changing buffer
sizes while something is "on the wire".

However Anton suggested an even simpler idea to me today. There is actually
no problem with receivers supporting all buffer sizes up to the maximum
allowed size (current configured memory segment size). Thus new buffer size
can be treated as a recommendation by the sender. We can announce a new
buffer size, and the sender will start capping the newly requested buffer
to that size, but we can still send already filled buffers in chunks with
any size, as long as it's below max memory segment size. In this way we can
leave any already filled in buffers on the sender side untouched and we do
not need to partition/slice them before sending them down, making at least
the initial version even simpler. This way we also do not need to
differentiate that different credits have different sizes. We just announce
a single value "recommended/requested buffer size".

Piotrek

czw., 15 lip 2021 o 17:27 Till Rohrmann  napisał(a):

> Hi everyone,
>
> Thanks a lot for creating this FLIP Anton and Piotr. I think it looks like
> a very promising solution for speeding up our checkpoints and being able to
> create them more reliably.
>
> Following up on Steven's question: I assume that buffer sizes are only
> changed for newly assigned buffers/credits, right? Otherwise, the data
> could already be on the wire and then it wouldn't fit on the receiver side.
> Or do we have a back channel mechanism to tell the sender that a part of a
> buffer needs to be resent once more capacity is available?
>
> Cheers,
> Till
>
> On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
> wrote:
>
> > Hi Steven,
> >
> > As downstream/upstream nodes are decoupled, if downstream nodes adjust
> > first it's buffer size first, there will be a lag until this updated
> buffer
> > size information reaches the upstream node.. It is a problem, but it has
> a
> > quite simple solution that we described in the FLIP document:
> >
> > > Sending the buffer of the right size.
> > > It is not enough to know just the number of available buffers (credits)
> > for the downstream because the size of these buffers can be different.
> > > So we are proposing to resolve this problem in the following way: If
> the
> > downstream buffer size is changed then the upstream should send
> > > the buffer of the size not greater than the new one regardless of how
> big
> > the current buffer on the upstream. (pollBuffer should receive
> > > parameters like bufferSize and return buffer not greater than it)
> >
> > So apart from adding buffer size information to the `AddCredit` message,
> we
> > will need to support a case where upstream subpartition has already
> > produced a buffer with older size (for example 32KB), while the next
> credit
> > arrives with an allowance for a smaller size (16KB). In that case, we are
> > only allowed to send a portion of the data from this buffer that fits
> into
> > the new updated buffer size, and keep announcing the remaining part as
> > available backlog.
> >
> > Best,
> > Piotrek
> >
> >
> > śr., 14 lip 2021 o 08:33 Steven Wu  napisał(a):
> >
> > >- The subtask observes the changes in the throughput and changes the
> > >buffer size during the whole life period of the task.
> > >- The subtask sends buffer size and number of available buffers to
> the
> > >upstream to the corresponding subpartition.
> > >- Upstream changes the buffer size corresponding to the received
> > >information.
> > >- Upstream sends the data and number of filled buffers to the
> > downstream
> > >
> > >
> > > Will the above steps of buffer size adjustment cause problems with
> > > credit-based flow control (mainly for downsizing), since downstream
> > > adjust down first?
> > >
> > > Here is the quote from the blog[1]
> > > "Credit-based flow control makes sure that whatever is “on the wire”
> will
> > > have capacity at the receiver to handle. "
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
> > >
> > >
> > > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for driving this, I think it is really helpful for jobs
> > suffering
> > > > from backpressure.
> > > >
> > > > Best,
> > > > 

Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-15 Thread Till Rohrmann
Hi everyone,

Thanks a lot for creating this FLIP Anton and Piotr. I think it looks like
a very promising solution for speeding up our checkpoints and being able to
create them more reliably.

Following up on Steven's question: I assume that buffer sizes are only
changed for newly assigned buffers/credits, right? Otherwise, the data
could already be on the wire and then it wouldn't fit on the receiver side.
Or do we have a back channel mechanism to tell the sender that a part of a
buffer needs to be resent once more capacity is available?

Cheers,
Till

On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski 
wrote:

> Hi Steven,
>
> As downstream/upstream nodes are decoupled, if downstream nodes adjust
> first it's buffer size first, there will be a lag until this updated buffer
> size information reaches the upstream node.. It is a problem, but it has a
> quite simple solution that we described in the FLIP document:
>
> > Sending the buffer of the right size.
> > It is not enough to know just the number of available buffers (credits)
> for the downstream because the size of these buffers can be different.
> > So we are proposing to resolve this problem in the following way: If the
> downstream buffer size is changed then the upstream should send
> > the buffer of the size not greater than the new one regardless of how big
> the current buffer on the upstream. (pollBuffer should receive
> > parameters like bufferSize and return buffer not greater than it)
>
> So apart from adding buffer size information to the `AddCredit` message, we
> will need to support a case where upstream subpartition has already
> produced a buffer with older size (for example 32KB), while the next credit
> arrives with an allowance for a smaller size (16KB). In that case, we are
> only allowed to send a portion of the data from this buffer that fits into
> the new updated buffer size, and keep announcing the remaining part as
> available backlog.
>
> Best,
> Piotrek
>
>
> śr., 14 lip 2021 o 08:33 Steven Wu  napisał(a):
>
> >- The subtask observes the changes in the throughput and changes the
> >buffer size during the whole life period of the task.
> >- The subtask sends buffer size and number of available buffers to the
> >upstream to the corresponding subpartition.
> >- Upstream changes the buffer size corresponding to the received
> >information.
> >- Upstream sends the data and number of filled buffers to the
> downstream
> >
> >
> > Will the above steps of buffer size adjustment cause problems with
> > credit-based flow control (mainly for downsizing), since downstream
> > adjust down first?
> >
> > Here is the quote from the blog[1]
> > "Credit-based flow control makes sure that whatever is “on the wire” will
> > have capacity at the receiver to handle. "
> >
> >
> > [1]
> >
> >
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
> >
> >
> > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao 
> > wrote:
> >
> > > Hi,
> > >
> > > Thanks for driving this, I think it is really helpful for jobs
> suffering
> > > from backpressure.
> > >
> > > Best,
> > > Yingjie
> > >
> > > Anton,Kalashnikov  于2021年7月9日周五 下午10:59写道:
> > >
> > > > Hey!
> > > >
> > > > There is a wish to decrease amount of in-flight data which can
> improve
> > > > aligned checkpoint time(fewer in-flight data to process before
> > > > checkpoint can complete) and improve the behaviour and performance of
> > > > unaligned checkpoints (fewer in-flight data that needs to be
> persisted
> > > > in every unaligned checkpoint). The main idea is not to keep as much
> > > > in-flight data as much memory we have but keeping the amount of data
> > > > which can be predictably handling for configured amount of time(ex.
> we
> > > > keep data which can be processed in 1 sec). It can be achieved by
> > > > calculation of the effective throughput and following changes the
> > buffer
> > > > size based on the this throughput. More details about the proposal
> you
> > > > can find here [1].
> > > >
> > > > What are you thoughts about it?
> > > >
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > > >
> > > >
> > > > --
> > > > Best regards,
> > > > Anton Kalashnikov
> > > >
> > > >
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-14 Thread Piotr Nowojski
Hi Steven,

As downstream/upstream nodes are decoupled, if downstream nodes adjust
first it's buffer size first, there will be a lag until this updated buffer
size information reaches the upstream node.. It is a problem, but it has a
quite simple solution that we described in the FLIP document:

> Sending the buffer of the right size.
> It is not enough to know just the number of available buffers (credits)
for the downstream because the size of these buffers can be different.
> So we are proposing to resolve this problem in the following way: If the
downstream buffer size is changed then the upstream should send
> the buffer of the size not greater than the new one regardless of how big
the current buffer on the upstream. (pollBuffer should receive
> parameters like bufferSize and return buffer not greater than it)

So apart from adding buffer size information to the `AddCredit` message, we
will need to support a case where upstream subpartition has already
produced a buffer with older size (for example 32KB), while the next credit
arrives with an allowance for a smaller size (16KB). In that case, we are
only allowed to send a portion of the data from this buffer that fits into
the new updated buffer size, and keep announcing the remaining part as
available backlog.

Best,
Piotrek


śr., 14 lip 2021 o 08:33 Steven Wu  napisał(a):

>- The subtask observes the changes in the throughput and changes the
>buffer size during the whole life period of the task.
>- The subtask sends buffer size and number of available buffers to the
>upstream to the corresponding subpartition.
>- Upstream changes the buffer size corresponding to the received
>information.
>- Upstream sends the data and number of filled buffers to the downstream
>
>
> Will the above steps of buffer size adjustment cause problems with
> credit-based flow control (mainly for downsizing), since downstream
> adjust down first?
>
> Here is the quote from the blog[1]
> "Credit-based flow control makes sure that whatever is “on the wire” will
> have capacity at the receiver to handle. "
>
>
> [1]
>
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
>
>
> On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao 
> wrote:
>
> > Hi,
> >
> > Thanks for driving this, I think it is really helpful for jobs suffering
> > from backpressure.
> >
> > Best,
> > Yingjie
> >
> > Anton,Kalashnikov  于2021年7月9日周五 下午10:59写道:
> >
> > > Hey!
> > >
> > > There is a wish to decrease amount of in-flight data which can improve
> > > aligned checkpoint time(fewer in-flight data to process before
> > > checkpoint can complete) and improve the behaviour and performance of
> > > unaligned checkpoints (fewer in-flight data that needs to be persisted
> > > in every unaligned checkpoint). The main idea is not to keep as much
> > > in-flight data as much memory we have but keeping the amount of data
> > > which can be predictably handling for configured amount of time(ex. we
> > > keep data which can be processed in 1 sec). It can be achieved by
> > > calculation of the effective throughput and following changes the
> buffer
> > > size based on the this throughput. More details about the proposal you
> > > can find here [1].
> > >
> > > What are you thoughts about it?
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > >
> > >
> > > --
> > > Best regards,
> > > Anton Kalashnikov
> > >
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-14 Thread Steven Wu
   - The subtask observes the changes in the throughput and changes the
   buffer size during the whole life period of the task.
   - The subtask sends buffer size and number of available buffers to the
   upstream to the corresponding subpartition.
   - Upstream changes the buffer size corresponding to the received
   information.
   - Upstream sends the data and number of filled buffers to the downstream


Will the above steps of buffer size adjustment cause problems with
credit-based flow control (mainly for downsizing), since downstream
adjust down first?

Here is the quote from the blog[1]
"Credit-based flow control makes sure that whatever is “on the wire” will
have capacity at the receiver to handle. "


[1]
https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control


On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao  wrote:

> Hi,
>
> Thanks for driving this, I think it is really helpful for jobs suffering
> from backpressure.
>
> Best,
> Yingjie
>
> Anton,Kalashnikov  于2021年7月9日周五 下午10:59写道:
>
> > Hey!
> >
> > There is a wish to decrease amount of in-flight data which can improve
> > aligned checkpoint time(fewer in-flight data to process before
> > checkpoint can complete) and improve the behaviour and performance of
> > unaligned checkpoints (fewer in-flight data that needs to be persisted
> > in every unaligned checkpoint). The main idea is not to keep as much
> > in-flight data as much memory we have but keeping the amount of data
> > which can be predictably handling for configured amount of time(ex. we
> > keep data which can be processed in 1 sec). It can be achieved by
> > calculation of the effective throughput and following changes the buffer
> > size based on the this throughput. More details about the proposal you
> > can find here [1].
> >
> > What are you thoughts about it?
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> >
> >
> > --
> > Best regards,
> > Anton Kalashnikov
> >
> >
> >
>


Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-13 Thread Yingjie Cao
Hi,

Thanks for driving this, I think it is really helpful for jobs suffering
from backpressure.

Best,
Yingjie

Anton,Kalashnikov  于2021年7月9日周五 下午10:59写道:

> Hey!
>
> There is a wish to decrease amount of in-flight data which can improve
> aligned checkpoint time(fewer in-flight data to process before
> checkpoint can complete) and improve the behaviour and performance of
> unaligned checkpoints (fewer in-flight data that needs to be persisted
> in every unaligned checkpoint). The main idea is not to keep as much
> in-flight data as much memory we have but keeping the amount of data
> which can be predictably handling for configured amount of time(ex. we
> keep data which can be processed in 1 sec). It can be achieved by
> calculation of the effective throughput and following changes the buffer
> size based on the this throughput. More details about the proposal you
> can find here [1].
>
> What are you thoughts about it?
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
>
>
> --
> Best regards,
> Anton Kalashnikov
>
>
>


[DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-12 Thread Anton,Kalashnikov

Hey!

There is a wish to decrease amount of in-flight data which can improve 
aligned checkpoint time(fewer in-flight data to process before 
checkpoint can complete) and improve the behaviour and performance of 
unaligned checkpoints (fewer in-flight data that needs to be persisted 
in every unaligned checkpoint). The main idea is not keepping as much 
in-flight data as much memory we have but keeping the amount of data 
which can be predictably handling for configured amount of time(ex. we 
keep data which can be processed in 1 sec). It can be achived by 
calculation of the effective throughput and following changes the buffer 
size based on the this throughput. More details about the proposal you 
can find here [1].


What are you thoughts about it?


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment



--
Best regards,
Anton Kalashnikov



Re: [DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-09 Thread Piotr Nowojski
Hey,

Thanks for writing this down. +1 from my side.

Best,
Piotrek

pt., 9 lip 2021 o 17:00 Anton,Kalashnikov  napisał(a):

> Hey!
>
> There is a wish to decrease amount of in-flight data which can improve
> aligned checkpoint time(fewer in-flight data to process before
> checkpoint can complete) and improve the behaviour and performance of
> unaligned checkpoints (fewer in-flight data that needs to be persisted
> in every unaligned checkpoint). The main idea is not to keep as much
> in-flight data as much memory we have but keeping the amount of data
> which can be predictably handling for configured amount of time(ex. we
> keep data which can be processed in 1 sec). It can be achieved by
> calculation of the effective throughput and following changes the buffer
> size based on the this throughput. More details about the proposal you
> can find here [1].
>
> What are you thoughts about it?
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
>
>
> --
> Best regards,
> Anton Kalashnikov
>
>
>


[DISCUSS] FLIP-183: Dynamic buffer size adjustment

2021-07-09 Thread Anton,Kalashnikov

Hey!

There is a wish to decrease amount of in-flight data which can improve 
aligned checkpoint time(fewer in-flight data to process before 
checkpoint can complete) and improve the behaviour and performance of 
unaligned checkpoints (fewer in-flight data that needs to be persisted 
in every unaligned checkpoint). The main idea is not to keep as much 
in-flight data as much memory we have but keeping the amount of data 
which can be predictably handling for configured amount of time(ex. we 
keep data which can be processed in 1 sec). It can be achieved by 
calculation of the effective throughput and following changes the buffer 
size based on the this throughput. More details about the proposal you 
can find here [1].


What are you thoughts about it?


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment



--
Best regards,
Anton Kalashnikov