Hi Anton Kalashnikov,

Thanks for your very clear reply, I think you are totally right.

The 'maxBuffersNumber - buffersInUseNumber' can be used as the
overdraft buffer, it won't need the new buffer configuration.Flink users
can turn up the maxBuffersNumber to control the overdraft buffer size.

Also, I‘d like to add some information. For safety, we should limit the
maximum number of overdraft segments that each LocalBufferPool
can apply for.

Why do we limit it?
Some operators don't check the `recordWriter.isAvailable` during
processing records, such as LegacySource. I have mentioned it in
FLINK-26759 [1]. I'm not sure if there are other cases.

If don't add the limitation, the LegacySource will use up all remaining
memory in the NetworkBufferPool when the backpressure is severe.

How to limit it?
I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions`
in the constructor of LocalBufferPool. The maxOverdraftBuffers is just
for safety, and it should be enough for most flink jobs. Or we can set
`maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle
some jobs of low parallelism.

Also if user don't enable the Unaligned Checkpoint, we can set
maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because
the overdraft isn't useful for the Aligned Checkpoint.

Please correct me if I'm wrong. Thanks a lot.

[1] https://issues.apache.org/jira/browse/FLINK-26759

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <kaa....@yandex.com>
wrote:

> Hi fanrui,
>
> Thanks for creating the FLIP.
>
> In general, I think the overdraft is good idea and it should help in
> described above cases. Here are my thoughts about configuration:
>
> Please, correct me if I am wrong but as I understand right now we have
> following calculation.
>
> maxBuffersNumber(per TaskManager) = Network memory(calculated via
> taskmanager.memory.network.fraction, taskmanager.memory.network.min,
> taskmanager.memory.network.max and total memory size) /
> taskmanager.memory.segment-size.
>
> requiredBuffersNumber(per TaskManager) = (exclusive buffers *
> parallelism + floating buffers) * subtasks number in TaskManager
>
> buffersInUseNumber = real number of buffers which used at current
> moment(always <= requiredBuffersNumber)
>
> Ideally requiredBuffersNumber should be equal to maxBuffersNumber which
> allows Flink work predictibly. But if requiredBuffersNumber >
> maxBuffersNumber sometimes it is also fine(but not good) since not all
> required buffers really mandatory(e.g. it is ok if Flink can not
> allocate floating buffers)
>
> But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink
> just never use these leftovers buffers(maxBuffersNumber -
> requiredBuffersNumber). Which I propose to use. ( we can actualy use
> even difference 'requiredBuffersNumber - buffersInUseNumber' since if
> one TaskManager contains several operators including 'window' which can
> temporally borrow buffers from the global pool).
>
> My proposal, more specificaly(it relates only to requesting buffers
> during processing single record while switching to unavalability between
> records should be the same as we have it now):
>
> * If one more buffer requested but maxBuffersPerChannel reached, then
> just ignore this limitation and allocate this buffers from any
> place(from LocalBufferPool if it has something yet otherwise from
> NetworkBufferPool)
>
> * If LocalBufferPool exceeds limit, then temporally allocate it from
> NetworkBufferPool while it has something to allocate
>
>
> Maybe I missed something and this solution won't work, but I like it
> since on the one hand, it work from the scratch without any
> configuration, on the other hand, it can be configuration by changing
> proportion of maxBuffersNumber and requiredBuffersNumber.
>
> The last thing that I want to say, I don't really want to implement new
> configuration since even now it is not clear how to correctly configure
> network buffers with existing configuration and I don't want to
> complicate it, especially if it will be possible to resolve the problem
> automatically(as described above).
>
>
> So is my understanding about network memory/buffers correct?
>
> --
>
> Best regards,
> Anton Kalashnikov
>
> 27.04.2022 07:46, rui fan пишет:
> > Hi everyone,
> >
> > Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It
> > effectively solves the problem of checkpoint timeout or slow
> > checkpoint when backpressure is severe.
> >
> > We found that UC(Unaligned Checkpoint) does not work well when the
> > back pressure is severe and multiple output buffers are required to
> > process a single record. FLINK-14396 [2] also mentioned this issue
> > before. So we propose the overdraft buffer to solve it.
> >
> > I created FLINK-26762[3] and FLIP-227[4] to detail the overdraft
> > buffer mechanism. After discussing with Anton Kalashnikov, there are
> > still some points to discuss:
> >
> >   * There are already a lot of buffer-related configurations. Do we
> >     need to add a new configuration for the overdraft buffer?
> >   * Where should the overdraft buffer use memory?
> >   * If the overdraft-buffer uses the memory remaining in the
> >     NetworkBufferPool, no new configuration needs to be added.
> >   * If adding a new configuration:
> >       o Should we set the overdraft-memory-size at the TM level or the
> >         Task level?
> >       o Or set overdraft-buffers to indicate the number of
> >         memory-segments that can be overdrawn.
> >       o What is the default value? How to set sensible defaults?
> >
> > Currently, I implemented a POC [5] and verified it using
> > flink-benchmarks [6]. The POC sets overdraft-buffers at Task level,
> > and default value is 10. That is: each LocalBufferPool can overdraw up
> > to 10 memory-segments.
> >
> > Looking forward to your feedback!
> >
> > Thanks,
> > fanrui
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > [2] https://issues.apache.org/jira/browse/FLINK-14396
> > [3] https://issues.apache.org/jira/browse/FLINK-26762
> > [4]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> > [5]
> >
> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
> > [6] https://github.com/apache/flink-benchmarks/pull/54
>
>

Reply via email to