Hi Steven,

As downstream/upstream nodes are decoupled, if downstream nodes adjust
first it's buffer size first, there will be a lag until this updated buffer
size information reaches the upstream node.. It is a problem, but it has a
quite simple solution that we described in the FLIP document:

> Sending the buffer of the right size.
> It is not enough to know just the number of available buffers (credits)
for the downstream because the size of these buffers can be different.
> So we are proposing to resolve this problem in the following way: If the
downstream buffer size is changed then the upstream should send
> the buffer of the size not greater than the new one regardless of how big
the current buffer on the upstream. (pollBuffer should receive
> parameters like bufferSize and return buffer not greater than it)

So apart from adding buffer size information to the `AddCredit` message, we
will need to support a case where upstream subpartition has already
produced a buffer with older size (for example 32KB), while the next credit
arrives with an allowance for a smaller size (16KB). In that case, we are
only allowed to send a portion of the data from this buffer that fits into
the new updated buffer size, and keep announcing the remaining part as
available backlog.

Best,
Piotrek


śr., 14 lip 2021 o 08:33 Steven Wu <stevenz...@gmail.com> napisał(a):

>    - The subtask observes the changes in the throughput and changes the
>    buffer size during the whole life period of the task.
>    - The subtask sends buffer size and number of available buffers to the
>    upstream to the corresponding subpartition.
>    - Upstream changes the buffer size corresponding to the received
>    information.
>    - Upstream sends the data and number of filled buffers to the downstream
>
>
> Will the above steps of buffer size adjustment cause problems with
> credit-based flow control (mainly for downsizing), since downstream
> adjust down first?
>
> Here is the quote from the blog[1]
> "Credit-based flow control makes sure that whatever is “on the wire” will
> have capacity at the receiver to handle. "
>
>
> [1]
>
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
>
>
> On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao <kevin.ying...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for driving this, I think it is really helpful for jobs suffering
> > from backpressure.
> >
> > Best,
> > Yingjie
> >
> > Anton,Kalashnikov <kaa....@yandex.com> 于2021年7月9日周五 下午10:59写道:
> >
> > > Hey!
> > >
> > > There is a wish to decrease amount of in-flight data which can improve
> > > aligned checkpoint time(fewer in-flight data to process before
> > > checkpoint can complete) and improve the behaviour and performance of
> > > unaligned checkpoints (fewer in-flight data that needs to be persisted
> > > in every unaligned checkpoint). The main idea is not to keep as much
> > > in-flight data as much memory we have but keeping the amount of data
> > > which can be predictably handling for configured amount of time(ex. we
> > > keep data which can be processed in 1 sec). It can be achieved by
> > > calculation of the effective throughput and following changes the
> buffer
> > > size based on the this throughput. More details about the proposal you
> > > can find here [1].
> > >
> > > What are you thoughts about it?
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > >
> > >
> > > --
> > > Best regards,
> > > Anton Kalashnikov
> > >
> > >
> > >
> >
>

Reply via email to