Hi.

-- 1. Do you mean split this into two JIRAs or two PRs or two commits in a
   PR?

Perhaps, the separated ticket will be better since this task has fewer 
questions but we should find a solution for LegacySource first.

--  2. For the first task, if the flink user disables the Unaligned
   Checkpoint, do we ignore max buffers per channel? Because the overdraft
   isn't useful for the Aligned Checkpoint, it still needs to wait for
   downstream Task to consume.

I think that the logic should be the same for AC and UC. As I understand, the 
overdraft maybe is not really helpful for AC but it doesn't make it worse as 
well.

 3. For the second task
--      - The default value of maxOverdraftBuffersPerPartition may also need
      to be discussed.

I think it should be a pretty small value or even 0 since it kind of 
optimization and user should understand what they do(especially if we implement 
the first task).

--      - If the user disables the Unaligned Checkpoint, can we set the
      maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for
      the Aligned Checkpoint.

The same answer that above, if the overdraft doesn't make degradation for the 
Aligned Checkpoint I don't think that we should make difference between AC and 
UC.

   4. For the legacy source
--      - If enabling the Unaligned Checkpoint, it uses up to
      maxOverdraftBuffersPerPartition buffers.
      - If disabling the UC, it doesn't use the overdraft buffer.
      - Do you think it's ok?

Ideally, I don't want to use overdraft for LegacySource at all since it can 
lead to undesirable results especially if the limit is high. At least, as I 
understand, it will always work in overdraft mode and it will borrow 
maxOverdraftBuffersPerPartition buffers from the global pool which can lead to 
degradation of other subtasks on the same TaskManager.

--      - Actually, we added the checkAvailable logic for LegacySource in our
      internal version. It works well.

I don't really understand how it is possible for general case considering that 
each user has their own implementation of LegacySourceOperator

--   5. For the benchmark, do you have any suggestions? I submitted the PR
   [1].

I haven't looked at it yet, but I'll try to do it soon.


29.04.2022 14:14, rui fan пишет:
Hi,

Thanks for your feedback. I have a servel of questions.

    1. Do you mean split this into two JIRAs or two PRs or two commits in a
    PR?
    2. For the first task, if the flink user disables the Unaligned
    Checkpoint, do we ignore max buffers per channel? Because the overdraft
    isn't useful for the Aligned Checkpoint, it still needs to wait for
    downstream Task to consume.
    3. For the second task
       - The default value of maxOverdraftBuffersPerPartition may also need
       to be discussed.
       - If the user disables the Unaligned Checkpoint, can we set the
       maxOverdraftBuffersPerPartition=0? Because the overdraft isn't useful for
       the Aligned Checkpoint.
    4. For the legacy source
       - If enabling the Unaligned Checkpoint, it uses up to
       maxOverdraftBuffersPerPartition buffers.
       - If disabling the UC, it doesn't use the overdraft buffer.
       - Do you think it's ok?
       - Actually, we added the checkAvailable logic for LegacySource in our
       internal version. It works well.
    5. For the benchmark, do you have any suggestions? I submitted the PR
    [1].

[1] https://github.com/apache/flink-benchmarks/pull/54

Thanks
fanrui

On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <kaa....@yandex.com>
wrote:

Hi,

We discuss about it a little with Dawid Wysakowicz. Here is some
conclusion:

First of all, let's split this into two tasks.

The first task is about ignoring max buffers per channel. This means if
we request a memory segment from LocalBufferPool and the
maxBuffersPerChannel is reached for this channel, we just ignore that
and continue to allocate buffer while LocalBufferPool has it(it is
actually not a overdraft).

The second task is about the real overdraft. I am pretty convinced now
that we, unfortunately, need configuration for limitation of overdraft
number(because it is not ok if one subtask allocates all buffers of one
TaskManager considering that several different jobs can be submitted on
this TaskManager). So idea is to have
maxOverdraftBuffersPerPartition(technically to say per LocalBufferPool).
In this case, when a limit of buffers in LocalBufferPool is reached,
LocalBufferPool can request additionally from NetworkBufferPool up to
maxOverdraftBuffersPerPartition buffers.


But it is still not clear how to handle LegacySource since it actually
works as unlimited flatmap and it will always work in overdraft mode
which is not a target. So we still need to think about that.


   29.04.2022 11:11, rui fan пишет:
Hi Anton Kalashnikov,

I think you agree with we should limit the maximum number of overdraft
segments that each LocalBufferPool can apply for, right?

I prefer to hard code the maxOverdraftBuffers due to don't add the new
configuration. And I hope to hear more from the community.

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com> wrote:

Hi Anton Kalashnikov,

Thanks for your very clear reply, I think you are totally right.

The 'maxBuffersNumber - buffersInUseNumber' can be used as the
overdraft buffer, it won't need the new buffer configuration.Flink users
can turn up the maxBuffersNumber to control the overdraft buffer size.

Also, I‘d like to add some information. For safety, we should limit the
maximum number of overdraft segments that each LocalBufferPool
can apply for.

Why do we limit it?
Some operators don't check the `recordWriter.isAvailable` during
processing records, such as LegacySource. I have mentioned it in
FLINK-26759 [1]. I'm not sure if there are other cases.

If don't add the limitation, the LegacySource will use up all remaining
memory in the NetworkBufferPool when the backpressure is severe.

How to limit it?
I prefer to hard code the `maxOverdraftBuffers=numberOfSubpartitions`
in the constructor of LocalBufferPool. The maxOverdraftBuffers is just
for safety, and it should be enough for most flink jobs. Or we can set
`maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to handle
some jobs of low parallelism.

Also if user don't enable the Unaligned Checkpoint, we can set
maxOverdraftBuffers=0 in the constructor of LocalBufferPool. Because
the overdraft isn't useful for the Aligned Checkpoint.

Please correct me if I'm wrong. Thanks a lot.

[1] https://issues.apache.org/jira/browse/FLINK-26759

Best wishes
fanrui

On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <kaa....@yandex.com>
wrote:

Hi fanrui,

Thanks for creating the FLIP.

In general, I think the overdraft is good idea and it should help in
described above cases. Here are my thoughts about configuration:

Please, correct me if I am wrong but as I understand right now we have
following calculation.

maxBuffersNumber(per TaskManager) = Network memory(calculated via
taskmanager.memory.network.fraction, taskmanager.memory.network.min,
taskmanager.memory.network.max and total memory size) /
taskmanager.memory.segment-size.

requiredBuffersNumber(per TaskManager) = (exclusive buffers *
parallelism + floating buffers) * subtasks number in TaskManager

buffersInUseNumber = real number of buffers which used at current
moment(always <= requiredBuffersNumber)

Ideally requiredBuffersNumber should be equal to maxBuffersNumber which
allows Flink work predictibly. But if requiredBuffersNumber >
maxBuffersNumber sometimes it is also fine(but not good) since not all
required buffers really mandatory(e.g. it is ok if Flink can not
allocate floating buffers)

But if maxBuffersNumber > requiredBuffersNumber, as I understand Flink
just never use these leftovers buffers(maxBuffersNumber -
requiredBuffersNumber). Which I propose to use. ( we can actualy use
even difference 'requiredBuffersNumber - buffersInUseNumber' since if
one TaskManager contains several operators including 'window' which can
temporally borrow buffers from the global pool).

My proposal, more specificaly(it relates only to requesting buffers
during processing single record while switching to unavalability
between
records should be the same as we have it now):

* If one more buffer requested but maxBuffersPerChannel reached, then
just ignore this limitation and allocate this buffers from any
place(from LocalBufferPool if it has something yet otherwise from
NetworkBufferPool)

* If LocalBufferPool exceeds limit, then temporally allocate it from
NetworkBufferPool while it has something to allocate


Maybe I missed something and this solution won't work, but I like it
since on the one hand, it work from the scratch without any
configuration, on the other hand, it can be configuration by changing
proportion of maxBuffersNumber and requiredBuffersNumber.

The last thing that I want to say, I don't really want to implement new
configuration since even now it is not clear how to correctly configure
network buffers with existing configuration and I don't want to
complicate it, especially if it will be possible to resolve the problem
automatically(as described above).


So is my understanding about network memory/buffers correct?

--

Best regards,
Anton Kalashnikov

27.04.2022 07:46, rui fan пишет:
Hi everyone,

Unaligned Checkpoint (FLIP-76 [1]) is a major feature of Flink. It
effectively solves the problem of checkpoint timeout or slow
checkpoint when backpressure is severe.

We found that UC(Unaligned Checkpoint) does not work well when the
back pressure is severe and multiple output buffers are required to
process a single record. FLINK-14396 [2] also mentioned this issue
before. So we propose the overdraft buffer to solve it.

I created FLINK-26762[3] and FLIP-227[4] to detail the overdraft
buffer mechanism. After discussing with Anton Kalashnikov, there are
still some points to discuss:

    * There are already a lot of buffer-related configurations. Do we
      need to add a new configuration for the overdraft buffer?
    * Where should the overdraft buffer use memory?
    * If the overdraft-buffer uses the memory remaining in the
      NetworkBufferPool, no new configuration needs to be added.
    * If adding a new configuration:
        o Should we set the overdraft-memory-size at the TM level or
the
          Task level?
        o Or set overdraft-buffers to indicate the number of
          memory-segments that can be overdrawn.
        o What is the default value? How to set sensible defaults?

Currently, I implemented a POC [5] and verified it using
flink-benchmarks [6]. The POC sets overdraft-buffers at Task level,
and default value is 10. That is: each LocalBufferPool can overdraw up
to 10 memory-segments.

Looking forward to your feedback!

Thanks,
fanrui

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
[2] https://issues.apache.org/jira/browse/FLINK-14396
[3] https://issues.apache.org/jira/browse/FLINK-26762
[4]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
[5]

https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
[6] https://github.com/apache/flink-benchmarks/pull/54
--

Best regards,
Anton Kalashnikov


--

Best regards,
Anton Kalashnikov

Reply via email to