I think this is a good idea. +1 for this approach. Are you gonna update the
FLIP accordingly?

Cheers,
Till

On Thu, Jul 15, 2021 at 9:33 PM Steven Wu <stevenz...@gmail.com> wrote:

> I really like the new idea.
>
> On Thu, Jul 15, 2021 at 11:51 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi Till,
> >
> > >  I assume that buffer sizes are only
> > > changed for newly assigned buffers/credits, right? Otherwise, the data
> > > could already be on the wire and then it wouldn't fit on the receiver
> > side.
> > > Or do we have a back channel mechanism to tell the sender that a part
> of
> > a
> > > buffer needs to be resent once more capacity is available?
> >
> > Initially our implementation proposal was intending to implement the
> first
> > option. Buffer size would be attached to a credit message, so first
> > received would need to allocate a buffer with the updated size, send the
> > credit upstream, and sender would be allowed to only send as much data as
> > in the credit. So there would be no way and no problem with changing
> buffer
> > sizes while something is "on the wire".
> >
> > However Anton suggested an even simpler idea to me today. There is
> actually
> > no problem with receivers supporting all buffer sizes up to the maximum
> > allowed size (current configured memory segment size). Thus new buffer
> size
> > can be treated as a recommendation by the sender. We can announce a new
> > buffer size, and the sender will start capping the newly requested buffer
> > to that size, but we can still send already filled buffers in chunks with
> > any size, as long as it's below max memory segment size. In this way we
> can
> > leave any already filled in buffers on the sender side untouched and we
> do
> > not need to partition/slice them before sending them down, making at
> least
> > the initial version even simpler. This way we also do not need to
> > differentiate that different credits have different sizes. We just
> announce
> > a single value "recommended/requested buffer size".
> >
> > Piotrek
> >
> > czw., 15 lip 2021 o 17:27 Till Rohrmann <trohrm...@apache.org>
> napisał(a):
> >
> > > Hi everyone,
> > >
> > > Thanks a lot for creating this FLIP Anton and Piotr. I think it looks
> > like
> > > a very promising solution for speeding up our checkpoints and being
> able
> > to
> > > create them more reliably.
> > >
> > > Following up on Steven's question: I assume that buffer sizes are only
> > > changed for newly assigned buffers/credits, right? Otherwise, the data
> > > could already be on the wire and then it wouldn't fit on the receiver
> > side.
> > > Or do we have a back channel mechanism to tell the sender that a part
> of
> > a
> > > buffer needs to be resent once more capacity is available?
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Jul 14, 2021 at 11:16 AM Piotr Nowojski <pnowoj...@apache.org>
> > > wrote:
> > >
> > > > Hi Steven,
> > > >
> > > > As downstream/upstream nodes are decoupled, if downstream nodes
> adjust
> > > > first it's buffer size first, there will be a lag until this updated
> > > buffer
> > > > size information reaches the upstream node.. It is a problem, but it
> > has
> > > a
> > > > quite simple solution that we described in the FLIP document:
> > > >
> > > > > Sending the buffer of the right size.
> > > > > It is not enough to know just the number of available buffers
> > (credits)
> > > > for the downstream because the size of these buffers can be
> different.
> > > > > So we are proposing to resolve this problem in the following way:
> If
> > > the
> > > > downstream buffer size is changed then the upstream should send
> > > > > the buffer of the size not greater than the new one regardless of
> how
> > > big
> > > > the current buffer on the upstream. (pollBuffer should receive
> > > > > parameters like bufferSize and return buffer not greater than it)
> > > >
> > > > So apart from adding buffer size information to the `AddCredit`
> > message,
> > > we
> > > > will need to support a case where upstream subpartition has already
> > > > produced a buffer with older size (for example 32KB), while the next
> > > credit
> > > > arrives with an allowance for a smaller size (16KB). In that case, we
> > are
> > > > only allowed to send a portion of the data from this buffer that fits
> > > into
> > > > the new updated buffer size, and keep announcing the remaining part
> as
> > > > available backlog.
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > >
> > > > śr., 14 lip 2021 o 08:33 Steven Wu <stevenz...@gmail.com>
> napisał(a):
> > > >
> > > > >    - The subtask observes the changes in the throughput and changes
> > the
> > > > >    buffer size during the whole life period of the task.
> > > > >    - The subtask sends buffer size and number of available buffers
> to
> > > the
> > > > >    upstream to the corresponding subpartition.
> > > > >    - Upstream changes the buffer size corresponding to the received
> > > > >    information.
> > > > >    - Upstream sends the data and number of filled buffers to the
> > > > downstream
> > > > >
> > > > >
> > > > > Will the above steps of buffer size adjustment cause problems with
> > > > > credit-based flow control (mainly for downsizing), since downstream
> > > > > adjust down first?
> > > > >
> > > > > Here is the quote from the blog[1]
> > > > > "Credit-based flow control makes sure that whatever is “on the
> wire”
> > > will
> > > > > have capacity at the receiver to handle. "
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://flink.apache.org/2019/06/05/flink-network-stack.html#credit-based-flow-control
> > > > >
> > > > >
> > > > > On Tue, Jul 13, 2021 at 7:34 PM Yingjie Cao <
> kevin.ying...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Thanks for driving this, I think it is really helpful for jobs
> > > > suffering
> > > > > > from backpressure.
> > > > > >
> > > > > > Best,
> > > > > > Yingjie
> > > > > >
> > > > > > Anton,Kalashnikov <kaa....@yandex.com> 于2021年7月9日周五 下午10:59写道:
> > > > > >
> > > > > > > Hey!
> > > > > > >
> > > > > > > There is a wish to decrease amount of in-flight data which can
> > > > improve
> > > > > > > aligned checkpoint time(fewer in-flight data to process before
> > > > > > > checkpoint can complete) and improve the behaviour and
> > performance
> > > of
> > > > > > > unaligned checkpoints (fewer in-flight data that needs to be
> > > > persisted
> > > > > > > in every unaligned checkpoint). The main idea is not to keep as
> > > much
> > > > > > > in-flight data as much memory we have but keeping the amount of
> > > data
> > > > > > > which can be predictably handling for configured amount of
> > time(ex.
> > > > we
> > > > > > > keep data which can be processed in 1 sec). It can be achieved
> by
> > > > > > > calculation of the effective throughput and following changes
> the
> > > > > buffer
> > > > > > > size based on the this throughput. More details about the
> > proposal
> > > > you
> > > > > > > can find here [1].
> > > > > > >
> > > > > > > What are you thoughts about it?
> > > > > > >
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Best regards,
> > > > > > > Anton Kalashnikov
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to