Hi everyone,

Just wanted to chip in on the discussion of legacy sources: IMHO, we should
not focus too much on improving/adding capabilities for legacy sources. We
want to persuade and push users to use the new Source API. Yes, this means
that there's work required by the end users to port any custom source to
the new interface. The benefits of the new Source API should outweigh this.
Anything that we build to support multiple interfaces means adding more
complexity and more possibilities for bugs. Let's try to make our lives a
little bit easier.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser


On Tue, 3 May 2022 at 07:50, rui fan <1996fan...@gmail.com> wrote:

> Hi Piotrek
>
> > Do you mean to ignore it while processing records, but keep using
> > `maxBuffersPerChannel` when calculating the availability of the output?
>
> I think yes, and please Anton Kalashnikov to help double check.
>
> > +1 for just having this as a separate configuration. Is it a big problem
> > that legacy sources would be ignoring it? Note that we already have
> > effectively hardcoded a single overdraft buffer.
> > `LocalBufferPool#checkAvailability` checks if there is a single buffer
> > available and this works the same for all tasks (including legacy source
> > tasks). Would it be a big issue if we changed it to check if at least
> > "overdraft number of buffers are available", where "overdraft number" is
> > configurable, instead of the currently hardcoded value of "1"?
>
> Do you mean don't add the extra buffers? We just use (exclusive buffers *
> parallelism + floating buffers)? The LocalBufferPool will be available when
> (usedBuffers+overdraftBuffers <=
> exclusiveBuffers*parallelism+floatingBuffers)
> and all subpartitions don't reach the maxBuffersPerChannel, right?
>
> If yes, I think it can solve the problem of legacy source. There may be
> some impact. If overdraftBuffers is large and only one buffer is used to
> process a single record, exclusive buffers*parallelism + floating buffers
> cannot be used. It may only be possible to use (exclusive buffers *
> parallelism
> + floating buffers - overdraft buffers + 1). For throughput, if turn up the
> overdraft buffers, the flink user needs to turn up exclusive or floating
> buffers. And it also affects the InputChannel.
>
> If not, I don't think it can solve the problem of legacy source. The legacy
> source don't check isAvailable, If there are the extra buffers, legacy
> source
> will use them up until block in requestMemory.
>
>
> Thanks
> fanrui
>
> On Tue, May 3, 2022 at 3:39 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi,
> >
> > +1 for the general proposal from my side. It would be a nice workaround
> > flatMaps, WindowOperators and large records issues with unaligned
> > checkpoints.
> >
> > > The first task is about ignoring max buffers per channel. This means if
> > > we request a memory segment from LocalBufferPool and the
> > > maxBuffersPerChannel is reached for this channel, we just ignore that
> > > and continue to allocate buffer while LocalBufferPool has it(it is
> > > actually not a overdraft).
> >
> > Do you mean to ignore it while processing records, but keep using
> > `maxBuffersPerChannel` when calculating the availability of the output?
> >
> > > The second task is about the real overdraft. I am pretty convinced now
> > > that we, unfortunately, need configuration for limitation of overdraft
> > > number(because it is not ok if one subtask allocates all buffers of one
> > > TaskManager considering that several different jobs can be submitted on
> > > this TaskManager). So idea is to have
> > > maxOverdraftBuffersPerPartition(technically to say per
> LocalBufferPool).
> > > In this case, when a limit of buffers in LocalBufferPool is reached,
> > > LocalBufferPool can request additionally from NetworkBufferPool up to
> > > maxOverdraftBuffersPerPartition buffers.
> >
> > +1 for just having this as a separate configuration. Is it a big problem
> > that legacy sources would be ignoring it? Note that we already have
> > effectively hardcoded a single overdraft buffer.
> > `LocalBufferPool#checkAvailability` checks if there is a single buffer
> > available and this works the same for all tasks (including legacy source
> > tasks). Would it be a big issue if we changed it to check if at least
> > "overdraft number of buffers are available", where "overdraft number" is
> > configurable, instead of the currently hardcoded value of "1"?
> >
> > Best,
> > Piotrek
> >
> > pt., 29 kwi 2022 o 17:04 rui fan <1996fan...@gmail.com> napisał(a):
> >
> > > Let me add some information about the LegacySource.
> > >
> > > If we want to disable the overdraft buffer for LegacySource.
> > > Could we add the enableOverdraft in LocalBufferPool?
> > > The default value is false. If the getAvailableFuture is called,
> > > change enableOverdraft=true. It indicates whether there are
> > > checks isAvailable elsewhere.
> > >
> > > I don't think it is elegant, but it's safe. Please correct me if I'm
> > wrong.
> > >
> > > Thanks
> > > fanrui
> > >
> > > On Fri, Apr 29, 2022 at 10:23 PM rui fan <1996fan...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your quick response.
> > > >
> > > > For question 1/2/3, we think they are clear. We just need to discuss
> > the
> > > > default value in PR.
> > > >
> > > > For the legacy source, you are right. It's difficult for general
> > > > implementation.
> > > > Currently, we implement ensureRecordWriterIsAvailable() in
> > > > SourceFunction.SourceContext. And call it in our common LegacySource,
> > > > e.g: FlinkKafkaConsumer. Over 90% of our Flink jobs consume kafka, so
> > > > fixing FlinkKafkaConsumer solved most of our problems.
> > > >
> > > > Core code:
> > > > ```
> > > > public void ensureRecordWriterIsAvailable() {
> > > >      if (recordWriter == null
> > > >           ||
> > > >
> > !configuration.getBoolean(ExecutionCheckpointingOptions.ENABLE_UNALIGNED,
> > > > false)
> > > >           || recordWriter.isAvailable()) {
> > > >           return;
> > > >      }
> > > >
> > > >      CompletableFuture<?> resumeFuture =
> > > recordWriter.getAvailableFuture();
> > > >      try {
> > > >           resumeFuture.get();
> > > >      } catch (Throwable ignored) {
> > > >      }
> > > > }
> > > > ```
> > > >
> > > > LegacySource calls sourceContext.ensureRecordWriterIsAvailable()
> > > > before synchronized (checkpointLock) and collects records.
> > > > Please let me know if there is a better solution.
> > > >
> > > > Thanks
> > > > fanrui
> > > >
> > > > On Fri, Apr 29, 2022 at 9:45 PM Anton Kalashnikov <
> kaa....@yandex.com>
> > > > wrote:
> > > >
> > > >> Hi.
> > > >>
> > > >> -- 1. Do you mean split this into two JIRAs or two PRs or two
> commits
> > > in a
> > > >>     PR?
> > > >>
> > > >> Perhaps, the separated ticket will be better since this task has
> fewer
> > > >> questions but we should find a solution for LegacySource first.
> > > >>
> > > >> --  2. For the first task, if the flink user disables the Unaligned
> > > >>     Checkpoint, do we ignore max buffers per channel? Because the
> > > >> overdraft
> > > >>     isn't useful for the Aligned Checkpoint, it still needs to wait
> > for
> > > >>     downstream Task to consume.
> > > >>
> > > >> I think that the logic should be the same for AC and UC. As I
> > > understand,
> > > >> the overdraft maybe is not really helpful for AC but it doesn't make
> > it
> > > >> worse as well.
> > > >>
> > > >>   3. For the second task
> > > >> --      - The default value of maxOverdraftBuffersPerPartition may
> > also
> > > >> need
> > > >>        to be discussed.
> > > >>
> > > >> I think it should be a pretty small value or even 0 since it kind of
> > > >> optimization and user should understand what they do(especially if
> we
> > > >> implement the first task).
> > > >>
> > > >> --      - If the user disables the Unaligned Checkpoint, can we set
> > the
> > > >>        maxOverdraftBuffersPerPartition=0? Because the overdraft
> isn't
> > > >> useful for
> > > >>        the Aligned Checkpoint.
> > > >>
> > > >> The same answer that above, if the overdraft doesn't make
> degradation
> > > for
> > > >> the Aligned Checkpoint I don't think that we should make difference
> > > between
> > > >> AC and UC.
> > > >>
> > > >>     4. For the legacy source
> > > >> --      - If enabling the Unaligned Checkpoint, it uses up to
> > > >>        maxOverdraftBuffersPerPartition buffers.
> > > >>        - If disabling the UC, it doesn't use the overdraft buffer.
> > > >>        - Do you think it's ok?
> > > >>
> > > >> Ideally, I don't want to use overdraft for LegacySource at all since
> > it
> > > >> can lead to undesirable results especially if the limit is high. At
> > > least,
> > > >> as I understand, it will always work in overdraft mode and it will
> > > borrow
> > > >> maxOverdraftBuffersPerPartition buffers from the global pool which
> can
> > > lead
> > > >> to degradation of other subtasks on the same TaskManager.
> > > >>
> > > >> --      - Actually, we added the checkAvailable logic for
> LegacySource
> > > in
> > > >> our
> > > >>        internal version. It works well.
> > > >>
> > > >> I don't really understand how it is possible for general case
> > > considering
> > > >> that each user has their own implementation of LegacySourceOperator
> > > >>
> > > >> --   5. For the benchmark, do you have any suggestions? I submitted
> > the
> > > PR
> > > >>     [1].
> > > >>
> > > >> I haven't looked at it yet, but I'll try to do it soon.
> > > >>
> > > >>
> > > >> 29.04.2022 14:14, rui fan пишет:
> > > >> > Hi,
> > > >> >
> > > >> > Thanks for your feedback. I have a servel of questions.
> > > >> >
> > > >> >     1. Do you mean split this into two JIRAs or two PRs or two
> > commits
> > > >> in a
> > > >> >     PR?
> > > >> >     2. For the first task, if the flink user disables the
> Unaligned
> > > >> >     Checkpoint, do we ignore max buffers per channel? Because the
> > > >> overdraft
> > > >> >     isn't useful for the Aligned Checkpoint, it still needs to
> wait
> > > for
> > > >> >     downstream Task to consume.
> > > >> >     3. For the second task
> > > >> >        - The default value of maxOverdraftBuffersPerPartition may
> > also
> > > >> need
> > > >> >        to be discussed.
> > > >> >        - If the user disables the Unaligned Checkpoint, can we set
> > the
> > > >> >        maxOverdraftBuffersPerPartition=0? Because the overdraft
> > isn't
> > > >> useful for
> > > >> >        the Aligned Checkpoint.
> > > >> >     4. For the legacy source
> > > >> >        - If enabling the Unaligned Checkpoint, it uses up to
> > > >> >        maxOverdraftBuffersPerPartition buffers.
> > > >> >        - If disabling the UC, it doesn't use the overdraft buffer.
> > > >> >        - Do you think it's ok?
> > > >> >        - Actually, we added the checkAvailable logic for
> > LegacySource
> > > >> in our
> > > >> >        internal version. It works well.
> > > >> >     5. For the benchmark, do you have any suggestions? I submitted
> > the
> > > >> PR
> > > >> >     [1].
> > > >> >
> > > >> > [1] https://github.com/apache/flink-benchmarks/pull/54
> > > >> >
> > > >> > Thanks
> > > >> > fanrui
> > > >> >
> > > >> > On Fri, Apr 29, 2022 at 7:41 PM Anton Kalashnikov <
> > kaa....@yandex.com
> > > >
> > > >> > wrote:
> > > >> >
> > > >> >> Hi,
> > > >> >>
> > > >> >> We discuss about it a little with Dawid Wysakowicz. Here is some
> > > >> >> conclusion:
> > > >> >>
> > > >> >> First of all, let's split this into two tasks.
> > > >> >>
> > > >> >> The first task is about ignoring max buffers per channel. This
> > means
> > > if
> > > >> >> we request a memory segment from LocalBufferPool and the
> > > >> >> maxBuffersPerChannel is reached for this channel, we just ignore
> > that
> > > >> >> and continue to allocate buffer while LocalBufferPool has it(it
> is
> > > >> >> actually not a overdraft).
> > > >> >>
> > > >> >> The second task is about the real overdraft. I am pretty
> convinced
> > > now
> > > >> >> that we, unfortunately, need configuration for limitation of
> > > overdraft
> > > >> >> number(because it is not ok if one subtask allocates all buffers
> of
> > > one
> > > >> >> TaskManager considering that several different jobs can be
> > submitted
> > > on
> > > >> >> this TaskManager). So idea is to have
> > > >> >> maxOverdraftBuffersPerPartition(technically to say per
> > > >> LocalBufferPool).
> > > >> >> In this case, when a limit of buffers in LocalBufferPool is
> > reached,
> > > >> >> LocalBufferPool can request additionally from NetworkBufferPool
> up
> > to
> > > >> >> maxOverdraftBuffersPerPartition buffers.
> > > >> >>
> > > >> >>
> > > >> >> But it is still not clear how to handle LegacySource since it
> > > actually
> > > >> >> works as unlimited flatmap and it will always work in overdraft
> > mode
> > > >> >> which is not a target. So we still need to think about that.
> > > >> >>
> > > >> >>
> > > >> >>    29.04.2022 11:11, rui fan пишет:
> > > >> >>> Hi Anton Kalashnikov,
> > > >> >>>
> > > >> >>> I think you agree with we should limit the maximum number of
> > > overdraft
> > > >> >>> segments that each LocalBufferPool can apply for, right?
> > > >> >>>
> > > >> >>> I prefer to hard code the maxOverdraftBuffers due to don't add
> the
> > > new
> > > >> >>> configuration. And I hope to hear more from the community.
> > > >> >>>
> > > >> >>> Best wishes
> > > >> >>> fanrui
> > > >> >>>
> > > >> >>> On Thu, Apr 28, 2022 at 12:39 PM rui fan <1996fan...@gmail.com>
> > > >> wrote:
> > > >> >>>
> > > >> >>>> Hi Anton Kalashnikov,
> > > >> >>>>
> > > >> >>>> Thanks for your very clear reply, I think you are totally
> right.
> > > >> >>>>
> > > >> >>>> The 'maxBuffersNumber - buffersInUseNumber' can be used as the
> > > >> >>>> overdraft buffer, it won't need the new buffer
> > configuration.Flink
> > > >> users
> > > >> >>>> can turn up the maxBuffersNumber to control the overdraft
> buffer
> > > >> size.
> > > >> >>>>
> > > >> >>>> Also, I‘d like to add some information. For safety, we should
> > limit
> > > >> the
> > > >> >>>> maximum number of overdraft segments that each LocalBufferPool
> > > >> >>>> can apply for.
> > > >> >>>>
> > > >> >>>> Why do we limit it?
> > > >> >>>> Some operators don't check the `recordWriter.isAvailable`
> during
> > > >> >>>> processing records, such as LegacySource. I have mentioned it
> in
> > > >> >>>> FLINK-26759 [1]. I'm not sure if there are other cases.
> > > >> >>>>
> > > >> >>>> If don't add the limitation, the LegacySource will use up all
> > > >> remaining
> > > >> >>>> memory in the NetworkBufferPool when the backpressure is
> severe.
> > > >> >>>>
> > > >> >>>> How to limit it?
> > > >> >>>> I prefer to hard code the
> > > `maxOverdraftBuffers=numberOfSubpartitions`
> > > >> >>>> in the constructor of LocalBufferPool. The maxOverdraftBuffers
> is
> > > >> just
> > > >> >>>> for safety, and it should be enough for most flink jobs. Or we
> > can
> > > >> set
> > > >> >>>> `maxOverdraftBuffers=Math.max(numberOfSubpartitions, 10)` to
> > handle
> > > >> >>>> some jobs of low parallelism.
> > > >> >>>>
> > > >> >>>> Also if user don't enable the Unaligned Checkpoint, we can set
> > > >> >>>> maxOverdraftBuffers=0 in the constructor of LocalBufferPool.
> > > Because
> > > >> >>>> the overdraft isn't useful for the Aligned Checkpoint.
> > > >> >>>>
> > > >> >>>> Please correct me if I'm wrong. Thanks a lot.
> > > >> >>>>
> > > >> >>>> [1] https://issues.apache.org/jira/browse/FLINK-26759
> > > >> >>>>
> > > >> >>>> Best wishes
> > > >> >>>> fanrui
> > > >> >>>>
> > > >> >>>> On Thu, Apr 28, 2022 at 12:29 AM Anton Kalashnikov <
> > > >> kaa....@yandex.com>
> > > >> >>>> wrote:
> > > >> >>>>
> > > >> >>>>> Hi fanrui,
> > > >> >>>>>
> > > >> >>>>> Thanks for creating the FLIP.
> > > >> >>>>>
> > > >> >>>>> In general, I think the overdraft is good idea and it should
> > help
> > > in
> > > >> >>>>> described above cases. Here are my thoughts about
> configuration:
> > > >> >>>>>
> > > >> >>>>> Please, correct me if I am wrong but as I understand right now
> > we
> > > >> have
> > > >> >>>>> following calculation.
> > > >> >>>>>
> > > >> >>>>> maxBuffersNumber(per TaskManager) = Network memory(calculated
> > via
> > > >> >>>>> taskmanager.memory.network.fraction,
> > > taskmanager.memory.network.min,
> > > >> >>>>> taskmanager.memory.network.max and total memory size) /
> > > >> >>>>> taskmanager.memory.segment-size.
> > > >> >>>>>
> > > >> >>>>> requiredBuffersNumber(per TaskManager) = (exclusive buffers *
> > > >> >>>>> parallelism + floating buffers) * subtasks number in
> TaskManager
> > > >> >>>>>
> > > >> >>>>> buffersInUseNumber = real number of buffers which used at
> > current
> > > >> >>>>> moment(always <= requiredBuffersNumber)
> > > >> >>>>>
> > > >> >>>>> Ideally requiredBuffersNumber should be equal to
> > maxBuffersNumber
> > > >> which
> > > >> >>>>> allows Flink work predictibly. But if requiredBuffersNumber >
> > > >> >>>>> maxBuffersNumber sometimes it is also fine(but not good) since
> > not
> > > >> all
> > > >> >>>>> required buffers really mandatory(e.g. it is ok if Flink can
> not
> > > >> >>>>> allocate floating buffers)
> > > >> >>>>>
> > > >> >>>>> But if maxBuffersNumber > requiredBuffersNumber, as I
> understand
> > > >> Flink
> > > >> >>>>> just never use these leftovers buffers(maxBuffersNumber -
> > > >> >>>>> requiredBuffersNumber). Which I propose to use. ( we can
> actualy
> > > use
> > > >> >>>>> even difference 'requiredBuffersNumber - buffersInUseNumber'
> > since
> > > >> if
> > > >> >>>>> one TaskManager contains several operators including 'window'
> > > which
> > > >> can
> > > >> >>>>> temporally borrow buffers from the global pool).
> > > >> >>>>>
> > > >> >>>>> My proposal, more specificaly(it relates only to requesting
> > > buffers
> > > >> >>>>> during processing single record while switching to
> unavalability
> > > >> >> between
> > > >> >>>>> records should be the same as we have it now):
> > > >> >>>>>
> > > >> >>>>> * If one more buffer requested but maxBuffersPerChannel
> reached,
> > > >> then
> > > >> >>>>> just ignore this limitation and allocate this buffers from any
> > > >> >>>>> place(from LocalBufferPool if it has something yet otherwise
> > from
> > > >> >>>>> NetworkBufferPool)
> > > >> >>>>>
> > > >> >>>>> * If LocalBufferPool exceeds limit, then temporally allocate
> it
> > > from
> > > >> >>>>> NetworkBufferPool while it has something to allocate
> > > >> >>>>>
> > > >> >>>>>
> > > >> >>>>> Maybe I missed something and this solution won't work, but I
> > like
> > > it
> > > >> >>>>> since on the one hand, it work from the scratch without any
> > > >> >>>>> configuration, on the other hand, it can be configuration by
> > > >> changing
> > > >> >>>>> proportion of maxBuffersNumber and requiredBuffersNumber.
> > > >> >>>>>
> > > >> >>>>> The last thing that I want to say, I don't really want to
> > > implement
> > > >> new
> > > >> >>>>> configuration since even now it is not clear how to correctly
> > > >> configure
> > > >> >>>>> network buffers with existing configuration and I don't want
> to
> > > >> >>>>> complicate it, especially if it will be possible to resolve
> the
> > > >> problem
> > > >> >>>>> automatically(as described above).
> > > >> >>>>>
> > > >> >>>>>
> > > >> >>>>> So is my understanding about network memory/buffers correct?
> > > >> >>>>>
> > > >> >>>>> --
> > > >> >>>>>
> > > >> >>>>> Best regards,
> > > >> >>>>> Anton Kalashnikov
> > > >> >>>>>
> > > >> >>>>> 27.04.2022 07:46, rui fan пишет:
> > > >> >>>>>> Hi everyone,
> > > >> >>>>>>
> > > >> >>>>>> Unaligned Checkpoint (FLIP-76 [1]) is a major feature of
> Flink.
> > > It
> > > >> >>>>>> effectively solves the problem of checkpoint timeout or slow
> > > >> >>>>>> checkpoint when backpressure is severe.
> > > >> >>>>>>
> > > >> >>>>>> We found that UC(Unaligned Checkpoint) does not work well
> when
> > > the
> > > >> >>>>>> back pressure is severe and multiple output buffers are
> > required
> > > to
> > > >> >>>>>> process a single record. FLINK-14396 [2] also mentioned this
> > > issue
> > > >> >>>>>> before. So we propose the overdraft buffer to solve it.
> > > >> >>>>>>
> > > >> >>>>>> I created FLINK-26762[3] and FLIP-227[4] to detail the
> > overdraft
> > > >> >>>>>> buffer mechanism. After discussing with Anton Kalashnikov,
> > there
> > > >> are
> > > >> >>>>>> still some points to discuss:
> > > >> >>>>>>
> > > >> >>>>>>     * There are already a lot of buffer-related
> configurations.
> > > Do
> > > >> we
> > > >> >>>>>>       need to add a new configuration for the overdraft
> buffer?
> > > >> >>>>>>     * Where should the overdraft buffer use memory?
> > > >> >>>>>>     * If the overdraft-buffer uses the memory remaining in
> the
> > > >> >>>>>>       NetworkBufferPool, no new configuration needs to be
> > added.
> > > >> >>>>>>     * If adding a new configuration:
> > > >> >>>>>>         o Should we set the overdraft-memory-size at the TM
> > level
> > > >> or
> > > >> >> the
> > > >> >>>>>>           Task level?
> > > >> >>>>>>         o Or set overdraft-buffers to indicate the number of
> > > >> >>>>>>           memory-segments that can be overdrawn.
> > > >> >>>>>>         o What is the default value? How to set sensible
> > > defaults?
> > > >> >>>>>>
> > > >> >>>>>> Currently, I implemented a POC [5] and verified it using
> > > >> >>>>>> flink-benchmarks [6]. The POC sets overdraft-buffers at Task
> > > level,
> > > >> >>>>>> and default value is 10. That is: each LocalBufferPool can
> > > >> overdraw up
> > > >> >>>>>> to 10 memory-segments.
> > > >> >>>>>>
> > > >> >>>>>> Looking forward to your feedback!
> > > >> >>>>>>
> > > >> >>>>>> Thanks,
> > > >> >>>>>> fanrui
> > > >> >>>>>>
> > > >> >>>>>> [1]
> > > >> >>>>>>
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
> > > >> >>>>>> [2] https://issues.apache.org/jira/browse/FLINK-14396
> > > >> >>>>>> [3] https://issues.apache.org/jira/browse/FLINK-26762
> > > >> >>>>>> [4]
> > > >> >>>>>>
> > > >> >>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-227%3A+Support+overdraft+buffer
> > > >> >>>>>> [5]
> > > >> >>>>>>
> > > >> >>
> > > >>
> > >
> >
> https://github.com/1996fanrui/flink/commit/c7559d94767de97c24ea8c540878832138c8e8fe
> > > >> >>>>>> [6] https://github.com/apache/flink-benchmarks/pull/54
> > > >> >> --
> > > >> >>
> > > >> >> Best regards,
> > > >> >> Anton Kalashnikov
> > > >> >>
> > > >> >>
> > > >> --
> > > >>
> > > >> Best regards,
> > > >> Anton Kalashnikov
> > > >>
> > > >>
> > >
> >
>

Reply via email to