Thank you Leonard for the review!

Hi Piotr, do you have any comments on the latest proposal?

I am wondering if it is OK to start the voting thread this week.

On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com> wrote:

> Thanks Dong for driving this FLIP forward!
>
> Introducing  `backlog status` concept for flink job makes sense to me as
> following reasons:
>
> From concept/API design perspective, it’s more general and natural than
> above proposals as it can be used in HybridSource for bounded records, CDC
> Source for history snapshot and general sources like KafkaSource for
> historical messages.
>
> From user cases/requirements, I’ve seen many users manually to set larger
> checkpoint interval during backfilling and then set a shorter checkpoint
> interval for real-time processing in their production environments as a
> flink application optimization. Now, the flink framework can make this
> optimization no longer require the user to set the checkpoint interval and
> restart the job multiple times.
>
> Following supporting using larger checkpoint for job under backlog status
> in current FLIP, we can explore supporting larger parallelism/memory/cpu
> for job under backlog status in the future.
>
> In short, the updated FLIP looks good to me.
>
>
> Best,
> Leonard
>
>
> > On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > Hi Piotr,
> >
> > Thanks again for proposing the isProcessingBacklog concept.
> >
> > After discussing with Becket Qin and thinking about this more, I agree it
> > is a better idea to add a top-level concept to all source operators to
> > address the target use-case.
> >
> > The main reason that changed my mind is that isProcessingBacklog can be
> > described as an inherent/nature attribute of every source instance and
> its
> > semantics does not need to depend on any specific checkpointing policy.
> > Also, we can hardcode the isProcessingBacklog behavior for the sources we
> > have considered so far (e.g. HybridSource and MySQL CDC source) without
> > asking users to explicitly configure the per-source behavior, which
> indeed
> > provides better user experience.
> >
> > I have updated the FLIP based on the latest suggestions. The latest FLIP
> no
> > longer introduces per-source config that can be used by end-users. While
> I
> > agree with you that CheckpointTrigger can be a useful feature to address
> > additional use-cases, I am not sure it is necessary for the use-case
> > targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately
> > in another FLIP?
> >
> > Can you help take another look at the updated FLIP?
> >
> > Best,
> > Dong
> >
> >
> >
> > On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org>
> > wrote:
> >
> >> Hi Dong,
> >>
> >>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>> "backpressured" at a given time (due to random traffic spikes). Then at
> >> any
> >>> given time, the chance of the job
> >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> the
> >>> backpressure metric once a second, the estimated time for the job
> >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> 23163
> >>> sec = 6.4 hours.
> >>>
> >>> This means that the job will effectively always use the longer
> >>> checkpointing interval. It looks like a real concern, right?
> >>
> >> Sorry I don't understand where you are getting those numbers from.
> >> Instead of trying to find loophole after loophole, could you try to
> think
> >> how a given loophole could be improved/solved?
> >>
> >>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>> following reasons.
> >>
> >> Please propose something. I don't think it's needed.
> >>
> >>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >> APIs
> >>> of this alternative approach be more or less usable?
> >>
> >> Everything that you originally wanted to achieve in FLIP-309, you could
> do
> >> as well in my proposal.
> >> Vide my many mentions of the "hacky solution".
> >>
> >>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>> checkpointing interval to change dynamically even during the unbounded
> >>> phase) as it claims?
> >>
> >> I don't see why not.
> >>
> >>> - Can these APIs be decoupled from the APIs currently proposed in
> >> FLIP-309?
> >>
> >> Yes
> >>
> >>> For example, if the APIs of this alternative approach can be decoupled
> >> from
> >>> the APIs currently proposed in FLIP-309, then it might be reasonable to
> >>> work on this extra use-case with a more advanced/complicated design
> >>> separately in a followup work.
> >>
> >> As I voiced my concerns previously, the current design of FLIP-309 would
> >> clog the public API and in the long run confuse the users. IMO It's
> >> addressing the
> >> problem in the wrong place.
> >>
> >>> Hmm.. do you mean we can do the following:
> >>> - Have all source operators emit a metric named "processingBacklog".
> >>> - Add a job-level config that specifies "the checkpointing interval to
> be
> >>> used when any source is processing backlog".
> >>> - The JM collects the "processingBacklog" periodically from all source
> >>> operators and uses the newly added config value as appropriate.
> >>
> >> Yes.
> >>
> >>> The challenge with this approach is that we need to define the
> semantics
> >> of
> >>> this "processingBacklog" metric and have all source operators
> >>> implement this metric. I am not sure we are able to do this yet without
> >>> having users explicitly provide this information on a per-source basis.
> >>>
> >>> Suppose the job read from a bounded Kafka source, should it emit
> >>> "processingBacklog=true"? If yes, then the job might use long
> >> checkpointing
> >>> interval even
> >>> if the job is asked to process data starting from now to the next 1
> hour.
> >>> If no, then the job might use the short checkpointing interval
> >>> even if the job is asked to re-process data starting from 7 days ago.
> >>
> >> Yes. The same can be said of your proposal. Your proposal has the very
> same
> >> issues
> >> that every source would have to implement it differently, most sources
> >> would
> >> have no idea how to properly calculate the new requested checkpoint
> >> interval,
> >> for those that do know how to do that, user would have to configure
> every
> >> source
> >> individually and yet again we would end up with a system, that works
> only
> >> partially in
> >> some special use cases (HybridSource), that's confusing the users even
> >> more.
> >>
> >> That's why I think the more generic solution, working primarily on the
> same
> >> metrics that are used by various auto scaling solutions (like Flink K8s
> >> operator's
> >> autosaler) would be better. The hacky solution I proposed to:
> >> 1. show you that the generic solution is simply a superset of your
> proposal
> >> 2. if you are adamant that busyness/backpressured/records processing
> >> rate/pending records
> >>    metrics wouldn't cover your use case sufficiently (imo they can),
> then
> >> you can very easily
> >>    enhance this algorithm with using some hints from the sources. Like
> >> "processingBacklog==true"
> >>    to short circuit the main algorithm, if `processingBacklog` is
> >> available.
> >>
> >> Best,
> >> Piotrek
> >>
> >>
> >> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a):
> >>
> >>> Hi again Piotr,
> >>>
> >>> Thank you for the reply. Please see my reply inline.
> >>>
> >>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski <
> >> piotr.nowoj...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi again Dong,
> >>>>
> >>>>> I understand that JM will get the backpressure-related metrics every
> >>> time
> >>>>> the RestServerEndpoint receives the REST request to get these
> >> metrics.
> >>>> But
> >>>>> I am not sure if RestServerEndpoint is already always receiving the
> >>> REST
> >>>>> metrics at regular interval (suppose there is no human manually
> >>>>> opening/clicking the Flink Web UI). And if it does, what is the
> >>> interval?
> >>>>
> >>>> Good catch, I've thought that metrics are pre-emptively sent to JM
> >> every
> >>> 10
> >>>> seconds.
> >>>> Indeed that's not the case at the moment, and that would have to be
> >>>> improved.
> >>>>
> >>>>> I would be surprised if Flink is already paying this much overhead
> >> just
> >>>> for
> >>>>> metrics monitoring. That is the main reason I still doubt it is true.
> >>> Can
> >>>>> you show where this 100 ms is currently configured?
> >>>>>
> >>>>> Alternatively, maybe you mean that we should add extra code to invoke
> >>> the
> >>>>> REST API at 100 ms interval. Then that means we need to considerably
> >>>>> increase the network/cpu overhead at JM, where the overhead will
> >>> increase
> >>>>> as the number of TM/slots increase, which may pose risk to the
> >>>> scalability
> >>>>> of the proposed design. I am not sure we should do this. What do you
> >>>> think?
> >>>>
> >>>> Sorry. I didn't mean metric should be reported every 100ms. I meant
> >> that
> >>>> "backPressuredTimeMsPerSecond (metric) would report (a value of)
> >>> 100ms/s."
> >>>> once per metric interval (10s?).
> >>>>
> >>>
> >>> Suppose there are 1000 subtask and each subtask has 1% chance of being
> >>> "backpressured" at a given time (due to random traffic spikes). Then at
> >> any
> >>> given time, the chance of the job
> >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate
> the
> >>> backpressure metric once a second, the estimated time for the job
> >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) =
> 23163
> >>> sec = 6.4 hours.
> >>>
> >>> This means that the job will effectively always use the longer
> >>> checkpointing interval. It looks like a real concern, right?
> >>>
> >>>
> >>>>> - What is the interface of this CheckpointTrigger? For example, are
> >> we
> >>>>> going to give CheckpointTrigger a context that it can use to fetch
> >>>>> arbitrary metric values? This can help us understand what information
> >>>> this
> >>>>> user-defined CheckpointTrigger can use to make the checkpoint
> >> decision.
> >>>>
> >>>> I honestly don't think this is important at this stage of the
> >> discussion.
> >>>> It could have
> >>>> whatever interface we would deem to be best. Required things:
> >>>>
> >>>> - access to at least a subset of metrics that the given
> >>> `CheckpointTrigger`
> >>>> requests,
> >>>>  for example via some registration mechanism, so we don't have to
> >> fetch
> >>>> all of the
> >>>>  metrics all the time from TMs.
> >>>> - some way to influence `CheckpointCoordinator`. Either via manually
> >>>> triggering
> >>>>  checkpoints, and/or ability to change the checkpointing interval.
> >>>>
> >>>
> >>> Hmm... I honestly think it will be useful to know the APIs due to the
> >>> following reasons.
> >>>
> >>> We would need to know the concrete APIs to gauge the following:
> >>> - For the use-case mentioned in FLIP-309 motivation section, would the
> >> APIs
> >>> of this alternative approach be more or less usable?
> >>> - Can these APIs reliably address the extra use-case (e.g. allow
> >>> checkpointing interval to change dynamically even during the unbounded
> >>> phase) as it claims?
> >>> - Can these APIs be decoupled from the APIs currently proposed in
> >> FLIP-309?
> >>>
> >>> For example, if the APIs of this alternative approach can be decoupled
> >> from
> >>> the APIs currently proposed in FLIP-309, then it might be reasonable to
> >>> work on this extra use-case with a more advanced/complicated design
> >>> separately in a followup work.
> >>>
> >>>
> >>>>> - Where is this CheckpointTrigger running? For example, is it going
> >> to
> >>>> run
> >>>>> on the subtask of every source operator? Or is it going to run on the
> >>> JM?
> >>>>
> >>>> IMO on the JM.
> >>>>
> >>>>> - Are we going to provide a default implementation of this
> >>>>> CheckpointTrigger in Flink that implements the algorithm described
> >>> below,
> >>>>> or do we expect each source operator developer to implement their own
> >>>>> CheckpointTrigger?
> >>>>
> >>>> As I mentioned before, I think we should provide at the very least the
> >>>> implementation
> >>>> that replaces the current triggering mechanism (statically configured
> >>>> checkpointing interval)
> >>>> and it would be great to provide the backpressure monitoring trigger
> as
> >>>> well.
> >>>>
> >>>
> >>> I agree that if there is a good use-case that can be addressed by the
> >>> proposed CheckpointTrigger, then it is reasonable
> >>> to add CheckpointTrigger and replace the current triggering mechanism
> >> with
> >>> it.
> >>>
> >>> I also agree that we will likely find such a use-case. For example,
> >> suppose
> >>> the source records have event timestamps, then it is likely
> >>> that we can use the trigger to dynamically control the checkpointing
> >>> interval based on the difference between the watermark and current
> system
> >>> time.
> >>>
> >>> But I am not sure the addition of this CheckpointTrigger should be
> >> coupled
> >>> with FLIP-309. Whether or not it is coupled probably depends on the
> >>> concrete API design around CheckpointTrigger.
> >>>
> >>> If you would be adamant that the backpressure monitoring doesn't cover
> >> well
> >>>> enough your use case, I would be ok to provide the hacky version that
> I
> >>>> also mentioned
> >>>> before:
> >>>
> >>>
> >>>> """
> >>>> Especially that if my proposed algorithm wouldn't work good enough,
> >> there
> >>>> is
> >>>> an obvious solution, that any source could add a metric, like let say
> >>>> "processingBacklog: true/false", and the `CheckpointTrigger`
> >>>> could use this as an override to always switch to the
> >>>> "slowCheckpointInterval". I don't think we need it, but that's always
> >> an
> >>>> option
> >>>> that would be basically equivalent to your original proposal.
> >>>> """
> >>>>
> >>>
> >>> Hmm.. do you mean we can do the following:
> >>> - Have all source operators emit a metric named "processingBacklog".
> >>> - Add a job-level config that specifies "the checkpointing interval to
> be
> >>> used when any source is processing backlog".
> >>> - The JM collects the "processingBacklog" periodically from all source
> >>> operators and uses the newly added config value as appropriate.
> >>>
> >>> The challenge with this approach is that we need to define the
> semantics
> >> of
> >>> this "processingBacklog" metric and have all source operators
> >>> implement this metric. I am not sure we are able to do this yet without
> >>> having users explicitly provide this information on a per-source basis.
> >>>
> >>> Suppose the job read from a bounded Kafka source, should it emit
> >>> "processingBacklog=true"? If yes, then the job might use long
> >> checkpointing
> >>> interval even
> >>> if the job is asked to process data starting from now to the next 1
> hour.
> >>> If no, then the job might use the short checkpointing interval
> >>> even if the job is asked to re-process data starting from 7 days ago.
> >>>
> >>>
> >>>>
> >>>>> - How can users specify the
> >>>> fastCheckpointInterval/slowCheckpointInterval?
> >>>>> For example, will we provide APIs on the CheckpointTrigger that
> >>> end-users
> >>>>> can use to specify the checkpointing interval? What would that look
> >>> like?
> >>>>
> >>>> Also as I mentioned before, just like metric reporters are configured:
> >>>>
> >>>>
> >>>
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/
> >>>> Every CheckpointTrigger could have its own custom configuration.
> >>>>
> >>>>> Overall, my gut feel is that the alternative approach based on
> >>>>> CheckpointTrigger is more complicated
> >>>>
> >>>> Yes, as usual, more generic things are more complicated, but often
> more
> >>>> useful in the long run.
> >>>>
> >>>>> and harder to use.
> >>>>
> >>>> I don't agree. Why setting in config
> >>>>
> >>>> execution.checkpointing.trigger:
> >> BackPressureMonitoringCheckpointTrigger
> >>>>
> >>>>
> >>>
> >>
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval:
> >>>> 1s
> >>>>
> >>>>
> >>>
> >>
> execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval:
> >>>> 30s
> >>>>
> >>>> that we could even provide a shortcut to the above construct via:
> >>>>
> >>>> execution.checkpointing.fast-interval: 1s
> >>>> execution.checkpointing.slow-interval: 30s
> >>>>
> >>>> is harder compared to setting two/three checkpoint intervals, one in
> >> the
> >>>> config/or via `env.enableCheckpointing(x)`,
> >>>> secondly passing one/two (fast/slow) values on the source itself?
> >>>>
> >>>
> >>> If we can address the use-case by providing just the two job-level
> config
> >>> as described above, I agree it will indeed be simpler.
> >>>
> >>> I have tried to achieve this goal. But the caveat is that it requires
> >> much
> >>> more work than described above in order to give the configs
> well-defined
> >>> semantics. So I find it simpler to just use the approach in FLIP-309.
> >>>
> >>> Let me explain my concern below. It will be great if you or someone
> else
> >>> can help provide a solution.
> >>>
> >>> 1) We need to clearly document when the fast-interval and slow-interval
> >>> will be used so that users can derive the expected behavior of the job
> >> and
> >>> be able to config these values.
> >>>
> >>> 2) The trigger of fast/slow interval depends on the behavior of the
> >> source
> >>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of source
> >>> operator (e.g. boundedness) can describe the target behavior. For
> >> example,
> >>> MySQL CDC internally has two phases, namely snapshot phase and binlog
> >>> phase, which are not explicitly exposed to its users via source
> operator
> >>> API. And we probably should not enumerate all internal phases of all
> >> source
> >>> operators that are affected by fast/slow interval.
> >>>
> >>> 3) An alternative approach might be to define a new concept (e.g.
> >>> processingBacklog) that is applied to all source operators. Then the
> >>> fast/slow interval's documentation can depend on this concept. That
> means
> >>> we have to add a top-level concept (similar to source boundedness) and
> >>> require all source operators to specify how they enforce this concept
> >> (e.g.
> >>> FileSystemSource always emits processingBacklog=true). And there might
> be
> >>> cases where the source itself (e.g. a bounded Kafka Source) can not
> >>> automatically derive the value of this concept, in which case we need
> to
> >>> provide option for users to explicitly specify the value for this
> concept
> >>> on a per-source basis.
> >>>
> >>>
> >>>
> >>>>> And it probably also has the issues of "having two places to
> >> configure
> >>>> checkpointing
> >>>>> interval" and "giving flexibility for every source to implement a
> >>>> different
> >>>>> API" (as mentioned below).
> >>>>
> >>>> No, it doesn't.
> >>>>
> >>>>> IMO, it is a hard-requirement for the user-facing API to be
> >>>>> clearly defined and users should be able to use the API without
> >> concern
> >>>> of
> >>>>> regression. And this requirement is more important than the other
> >> goals
> >>>>> discussed above because it is related to the stability/performance of
> >>> the
> >>>>> production job. What do you think?
> >>>>
> >>>> I don't agree with this. There are many things that work something in
> >>>> between perfectly and well enough
> >>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%),
> >>> while
> >>>> still being very useful.
> >>>> Good examples are: selection of state backend, unaligned checkpoints,
> >>>> buffer debloating but frankly if I go
> >>>> through list of currently available config options, something like
> half
> >>> of
> >>>> them can cause regressions. Heck,
> >>>> even Flink itself doesn't work perfectly in 100% of the use cases, due
> >>> to a
> >>>> variety of design choices. Of
> >>>> course, the more use cases are fine with said feature, the better, but
> >> we
> >>>> shouldn't fixate to perfectly cover
> >>>> 100% of the cases, as that's impossible.
> >>>>
> >>>> In this particular case, if back pressure monitoring  trigger can work
> >>> well
> >>>> enough in 95% of cases, I would
> >>>> say that's already better than the originally proposed alternative,
> >> which
> >>>> doesn't work at all if user has a large
> >>>> backlog to reprocess from Kafka, including when using HybridSource
> >> AFTER
> >>>> the switch to Kafka has
> >>>> happened. For the remaining 5%, we should try to improve the behaviour
> >>> over
> >>>> time, but ultimately, users can
> >>>> decide to just run a fixed checkpoint interval (or at worst use the
> >> hacky
> >>>> checkpoint trigger that I mentioned
> >>>> before a couple of times).
> >>>>
> >>>> Also to be pedantic, if a user naively selects slow-interval in your
> >>>> proposal to 30 minutes, when that user's
> >>>> job fails on average every 15-20minutes, his job can end up in a state
> >>> that
> >>>> it can not make any progress,
> >>>> this arguably is quite serious regression.
> >>>>
> >>>
> >>> I probably should not say it is "hard requirement". After all there are
> >>> pros/cons. We will need to consider implementation complexity,
> usability,
> >>> extensibility etc.
> >>>
> >>> I just don't think we should take it for granted to introduce
> regression
> >>> for one use-case in order to support another use-case. If we can not
> find
> >>> an algorithm/solution that addresses
> >>> both use-case well, I hope we can be open to tackle them separately so
> >> that
> >>> users can choose the option that best fits their needs.
> >>>
> >>> All things else being equal, I think it is preferred for user-facing
> API
> >> to
> >>> be clearly defined and let users should be able to use the API without
> >>> concern of regression.
> >>>
> >>> Maybe we can list pros/cons for the alternative approaches we have been
> >>> discussing and see choose the best approach. And maybe we will end up
> >>> finding that use-case
> >>> which needs CheckpointTrigger can be tackled separately from the
> use-case
> >>> in FLIP-309.
> >>>
> >>>
> >>>>> I am not sure if there is a typo. Because if
> >>> backPressuredTimeMsPerSecond
> >>>> =
> >>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> >> numRecordsInPerSecond /
> >>>>> 1000 * metricsUpdateInterval according to the above algorithm.
> >>>>>
> >>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
> >>>> (numRecordsInPerSecond
> >>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> >> metricsUpdateInterval"?
> >>>>
> >>>> It looks like there is indeed some mistake in my proposal above. Yours
> >>> look
> >>>> more correct, it probably
> >>>> still needs some safeguard/special handling if
> >>>> `backPressuredTimeMsPerSecond > 950`
> >>>>
> >>>>> The only information it can access is the backlog.
> >>>>
> >>>> Again no. It can access whatever we want to provide to it.
> >>>>
> >>>> Regarding the rest of your concerns. It's a matter of tweaking the
> >>>> parameters and the algorithm itself,
> >>>> and how much safety-net do we want to have. Ultimately, I'm pretty
> sure
> >>>> that's a (for 95-99% of cases)
> >>>> solvable problem. If not, there is always the hacky solution, that
> >> could
> >>> be
> >>>> even integrated into this above
> >>>> mentioned algorithm as a short circuit to always reach
> `slow-interval`.
> >>>>
> >>>> Apart of that, you picked 3 minutes as the checkpointing interval in
> >> your
> >>>> counter example. In most cases
> >>>> any interval above 1 minute would inflict pretty negligible overheads,
> >> so
> >>>> all in all, I would doubt there is
> >>>> a significant benefit (in most cases) of increasing 3 minute
> checkpoint
> >>>> interval to anything more, let alone
> >>>> 30 minutes.
> >>>>
> >>>
> >>> I am not sure we should design the algorithm with the assumption that
> the
> >>> short checkpointing interval will always be higher than 1 minute etc.
> >>>
> >>> I agree the proposed algorithm can solve most cases where the resource
> is
> >>> sufficient and there is always no backlog in source subtasks. On the
> >> other
> >>> hand, what makes SRE
> >>> life hard is probably the remaining 1-5% cases where the traffic is
> spiky
> >>> and the cluster is reaching its capacity limit. The ability to predict
> >> and
> >>> control Flink job's behavior (including checkpointing interval) can
> >>> considerably reduce the burden of manging Flink jobs.
> >>>
> >>> Best,
> >>> Dong
> >>>
> >>>
> >>>>
> >>>> Best,
> >>>> Piotrek
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a):
> >>>>
> >>>>> Hi Piotr,
> >>>>>
> >>>>> Thanks for the explanations. I have some followup questions below.
> >>>>>
> >>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> Thanks for chipping in the discussion Ahmed!
> >>>>>>
> >>>>>> Regarding using the REST API. Currently I'm leaning towards
> >>>> implementing
> >>>>>> this feature inside the Flink itself, via some pluggable interface.
> >>>>>> REST API solution would be tempting, but I guess not everyone is
> >>> using
> >>>>>> Flink Kubernetes Operator.
> >>>>>>
> >>>>>> @Dong
> >>>>>>
> >>>>>>> I am not sure metrics such as isBackPressured are already sent to
> >>> JM.
> >>>>>>
> >>>>>> Fetching code path on the JM:
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture
> >>>>>>
> >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add
> >>>>>>
> >>>>>> Example code path accessing Task level metrics via JM using the
> >>>>>> `MetricStore`:
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler
> >>>>>>
> >>>>>
> >>>>> Thanks for the code reference. I checked the code that invoked these
> >>> two
> >>>>> classes and found the following information:
> >>>>>
> >>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently invoked
> >> only
> >>>>> when AggregatingJobsMetricsHandler is invoked.
> >>>>> - AggregatingJobsMetricsHandler is only instantiated and returned by
> >>>>> WebMonitorEndpoint#initializeHandlers
> >>>>> - WebMonitorEndpoint#initializeHandlers is only used by
> >>>> RestServerEndpoint.
> >>>>> And RestServerEndpoint invokes these handlers in response to external
> >>>> REST
> >>>>> request.
> >>>>>
> >>>>> I understand that JM will get the backpressure-related metrics every
> >>> time
> >>>>> the RestServerEndpoint receives the REST request to get these
> >> metrics.
> >>>> But
> >>>>> I am not sure if RestServerEndpoint is already always receiving the
> >>> REST
> >>>>> metrics at regular interval (suppose there is no human manually
> >>>>> opening/clicking the Flink Web UI). And if it does, what is the
> >>> interval?
> >>>>>
> >>>>>
> >>>>>
> >>>>>>> For example, let's say every source operator subtask reports this
> >>>>> metric
> >>>>>> to
> >>>>>>> JM once every 10 seconds. There are 100 source subtasks. And each
> >>>>> subtask
> >>>>>>> is backpressured roughly 10% of the total time due to traffic
> >>> spikes
> >>>>> (and
> >>>>>>> limited buffer). Then at any given time, there are 1 - 0.9^100 =
> >>>>> 99.997%
> >>>>>>> chance that there is at least one subtask that is backpressured.
> >>> Then
> >>>>> we
> >>>>>>> have to wait for at least 10 seconds to check again.
> >>>>>>
> >>>>>> backPressuredTimeMsPerSecond and other related metrics (like
> >>>>>> busyTimeMsPerSecond) are not subject to that problem.
> >>>>>> They are recalculated once every metric fetching interval, and they
> >>>>> report
> >>>>>> accurately on average the given subtask spent
> >>>> busy/idling/backpressured.
> >>>>>> In your example, backPressuredTimeMsPerSecond would report 100ms/s.
> >>>>>
> >>>>>
> >>>>> Suppose every subtask is already reporting
> >> backPressuredTimeMsPerSecond
> >>>> to
> >>>>> JM once every 100 ms. If a job has 10 operators (that are not
> >> chained)
> >>>> and
> >>>>> each operator has 100 subtasks, then JM would need to handle 10000
> >>>> requests
> >>>>> per second to receive metrics from these 1000 subtasks. It seems
> >> like a
> >>>>> non-trivial overhead for medium-to-large sized jobs and can make JM
> >> the
> >>>>> performance bottleneck during job execution.
> >>>>>
> >>>>> I would be surprised if Flink is already paying this much overhead
> >> just
> >>>> for
> >>>>> metrics monitoring. That is the main reason I still doubt it is true.
> >>> Can
> >>>>> you show where this 100 ms is currently configured?
> >>>>>
> >>>>> Alternatively, maybe you mean that we should add extra code to invoke
> >>> the
> >>>>> REST API at 100 ms interval. Then that means we need to considerably
> >>>>> increase the network/cpu overhead at JM, where the overhead will
> >>> increase
> >>>>> as the number of TM/slots increase, which may pose risk to the
> >>>> scalability
> >>>>> of the proposed design. I am not sure we should do this. What do you
> >>>> think?
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>> While it will be nice to support additional use-cases
> >>>>>>> with one proposal, it is probably also reasonable to make
> >>> incremental
> >>>>>>> progress and support the low-hanging-fruit use-case first. The
> >>> choice
> >>>>>>> really depends on the complexity and the importance of supporting
> >>> the
> >>>>>> extra
> >>>>>>> use-cases.
> >>>>>>
> >>>>>> That would be true, if that was a private implementation detail or
> >> if
> >>>> the
> >>>>>> low-hanging-fruit-solution would be on the direct path to the final
> >>>>>> solution.
> >>>>>> That's unfortunately not the case here. This will add public facing
> >>>> API,
> >>>>>> that we will later need to maintain, no matter what the final
> >>> solution
> >>>>> will
> >>>>>> be,
> >>>>>> and at the moment at least I don't see it being related to a
> >>> "perfect"
> >>>>>> solution.
> >>>>>
> >>>>>
> >>>>> Sure. Then let's decide the final solution first.
> >>>>>
> >>>>>
> >>>>>>> I guess the point is that the suggested approach, which
> >> dynamically
> >>>>>>> determines the checkpointing interval based on the backpressure,
> >>> may
> >>>>>> cause
> >>>>>>> regression when the checkpointing interval is relatively low.
> >> This
> >>>>> makes
> >>>>>> it
> >>>>>>> hard for users to enable this feature in production. It is like
> >> an
> >>>>>>> auto-driving system that is not guaranteed to work
> >>>>>>
> >>>>>> Yes, creating a more generic solution that would require less
> >>>>> configuration
> >>>>>> is usually more difficult then static configurations.
> >>>>>> It doesn't mean we shouldn't try to do that. Especially that if my
> >>>>> proposed
> >>>>>> algorithm wouldn't work good enough, there is
> >>>>>> an obvious solution, that any source could add a metric, like let
> >> say
> >>>>>> "processingBacklog: true/false", and the `CheckpointTrigger`
> >>>>>> could use this as an override to always switch to the
> >>>>>> "slowCheckpointInterval". I don't think we need it, but that's
> >> always
> >>>> an
> >>>>>> option
> >>>>>> that would be basically equivalent to your original proposal. Or
> >> even
> >>>>>> source could add "suggestedCheckpointInterval : int", and
> >>>>>> `CheckpointTrigger` could use that value if present as a hint in
> >> one
> >>>> way
> >>>>> or
> >>>>>> another.
> >>>>>>
> >>>>>
> >>>>> So far we have talked about the possibility of using
> >> CheckpointTrigger
> >>>> and
> >>>>> mentioned the CheckpointTrigger
> >>>>> and read metric values.
> >>>>>
> >>>>> Can you help answer the following questions so that I can understand
> >>> the
> >>>>> alternative solution more concretely:
> >>>>>
> >>>>> - What is the interface of this CheckpointTrigger? For example, are
> >> we
> >>>>> going to give CheckpointTrigger a context that it can use to fetch
> >>>>> arbitrary metric values? This can help us understand what information
> >>>> this
> >>>>> user-defined CheckpointTrigger can use to make the checkpoint
> >> decision.
> >>>>> - Where is this CheckpointTrigger running? For example, is it going
> >> to
> >>>> run
> >>>>> on the subtask of every source operator? Or is it going to run on the
> >>> JM?
> >>>>> - Are we going to provide a default implementation of this
> >>>>> CheckpointTrigger in Flink that implements the algorithm described
> >>> below,
> >>>>> or do we expect each source operator developer to implement their own
> >>>>> CheckpointTrigger?
> >>>>> - How can users specify the
> >>>> fastCheckpointInterval/slowCheckpointInterval?
> >>>>> For example, will we provide APIs on the CheckpointTrigger that
> >>> end-users
> >>>>> can use to specify the checkpointing interval? What would that look
> >>> like?
> >>>>>
> >>>>> Overall, my gut feel is that the alternative approach based on
> >>>>> CheckpointTrigger is more complicated and harder to use. And it
> >>> probably
> >>>>> also has the issues of "having two places to configure checkpointing
> >>>>> interval" and "giving flexibility for every source to implement a
> >>>> different
> >>>>> API" (as mentioned below).
> >>>>>
> >>>>> Maybe we can evaluate it more after knowing the answers to the above
> >>>>> questions.
> >>>>>
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>> On the other hand, the approach currently proposed in the FLIP is
> >>>> much
> >>>>>>> simpler as it does not depend on backpressure. Users specify the
> >>>> extra
> >>>>>>> interval requirement on the specific sources (e.g. HybridSource,
> >>>> MySQL
> >>>>>> CDC
> >>>>>>> Source) and can easily know the checkpointing interval will be
> >> used
> >>>> on
> >>>>>> the
> >>>>>>> continuous phase of the corresponding source. This is pretty much
> >>>> same
> >>>>> as
> >>>>>>> how users use the existing execution.checkpointing.interval
> >> config.
> >>>> So
> >>>>>>> there is no extra concern of regression caused by this approach.
> >>>>>>
> >>>>>> To an extent, but as I have already previously mentioned I really
> >>>> really
> >>>>> do
> >>>>>> not like idea of:
> >>>>>>  - having two places to configure checkpointing interval (config
> >>> file
> >>>>> and
> >>>>>> in the Source builders)
> >>>>>>  - giving flexibility for every source to implement a different
> >> API
> >>>> for
> >>>>>> that purpose
> >>>>>>  - creating a solution that is not generic enough, so that we will
> >>>> need
> >>>>> a
> >>>>>> completely different mechanism in the future anyway
> >>>>>>
> >>>>>
> >>>>> Yeah, I understand different developers might have different
> >>>>> concerns/tastes for these APIs. Ultimately, there might not be a
> >>> perfect
> >>>>> solution and we have to choose based on the pros/cons of these
> >>> solutions.
> >>>>>
> >>>>> I agree with you that, all things being equal, it is preferable to 1)
> >>>> have
> >>>>> one place to configure checkpointing intervals, 2) have all source
> >>>>> operators use the same API, and 3) create a solution that is generic
> >>> and
> >>>>> last lasting. Note that these three goals affects the usability and
> >>>>> extensibility of the API, but not necessarily the
> >> stability/performance
> >>>> of
> >>>>> the production job.
> >>>>>
> >>>>> BTW, there are also other preferrable goals. For example, it is very
> >>>> useful
> >>>>> for the job's behavior to be predictable and interpretable so that
> >> SRE
> >>>> can
> >>>>> operator/debug the Flink in an easier way. We can list these
> >> pros/cons
> >>>>> altogether later.
> >>>>>
> >>>>> I am wondering if we can first agree on the priority of goals we want
> >>> to
> >>>>> achieve. IMO, it is a hard-requirement for the user-facing API to be
> >>>>> clearly defined and users should be able to use the API without
> >> concern
> >>>> of
> >>>>> regression. And this requirement is more important than the other
> >> goals
> >>>>> discussed above because it is related to the stability/performance of
> >>> the
> >>>>> production job. What do you think?
> >>>>>
> >>>>>
> >>>>>>
> >>>>>>> Sounds good. Looking forward to learning more ideas.
> >>>>>>
> >>>>>> I have thought about this a bit more, and I think we don't need to
> >>>> check
> >>>>>> for the backpressure status, or how much overloaded all of the
> >>>> operators
> >>>>>> are.
> >>>>>> We could just check three things for source operators:
> >>>>>> 1. pendingRecords (backlog length)
> >>>>>> 2. numRecordsInPerSecond
> >>>>>> 3. backPressuredTimeMsPerSecond
> >>>>>>
> >>>>>> // int metricsUpdateInterval = 10s // obtained from config
> >>>>>> // Next line calculates how many records can we consume from the
> >>>> backlog,
> >>>>>> assuming
> >>>>>> // that magically the reason behind a backpressure vanishes. We
> >> will
> >>>> use
> >>>>>> this only as
> >>>>>> // a safeguard  against scenarios like for example if backpressure
> >>> was
> >>>>>> caused by some
> >>>>>> // intermittent failure/performance degradation.
> >>>>>> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond /
> >>> (1000
> >>>>>> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval
> >>>>>>
> >>>>>
> >>>>> I am not sure if there is a typo. Because if
> >>>> backPressuredTimeMsPerSecond =
> >>>>> 0, then maxRecordsConsumedWithoutBackpressure =
> >> numRecordsInPerSecond /
> >>>>> 1000 * metricsUpdateInterval according to the above algorithm.
> >>>>>
> >>>>> Do you mean "maxRecordsConsumedWithoutBackpressure =
> >>>> (numRecordsInPerSecond
> >>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) *
> >> metricsUpdateInterval"?
> >>>>>
> >>>>>
> >>>>>>
> >>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure from the
> >>>>> backlog
> >>>>>> as
> >>>>>> // a safeguard against an intermittent back pressure problems, so
> >>> that
> >>>> we
> >>>>>> don't
> >>>>>> // calculate next checkpoint interval far far in the future, while
> >>> the
> >>>>>> backpressure
> >>>>>> // goes away before we will recalculate metrics and new
> >> checkpointing
> >>>>>> interval
> >>>>>> timeToConsumeBacklog = (pendingRecords -
> >>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond
> >>>>>>
> >>>>>>
> >>>>>> Then we can use those numbers to calculate desired checkpointed
> >>>> interval
> >>>>>> for example like this:
> >>>>>>
> >>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10;
> >> //this
> >>>> may
> >>>>>> need some refining
> >>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval,
> >>>>>> calculatedCheckpointInterval), slowCheckpointInterval);
> >>>>>> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval;
> >>>>>>
> >>>>>> WDYT?
> >>>>>
> >>>>>
> >>>>> I think the idea of the above algorithm is to incline to use the
> >>>>> fastCheckpointInterval unless we are very sure the backlog will take
> >> a
> >>>> long
> >>>>> time to process. This can alleviate the concern of regression during
> >>> the
> >>>>> continuous_bounded phase since we are more likely to use the
> >>>>> fastCheckpointInterval. However, it can cause regression during the
> >>>> bounded
> >>>>> phase.
> >>>>>
> >>>>> I will use a concrete example to explain the risk of regression:
> >>>>> - The user is using HybridSource to read from HDFS followed by Kafka.
> >>> The
> >>>>> data in HDFS is old and there is no need for data freshness for the
> >>> data
> >>>> in
> >>>>> HDFS.
> >>>>> - The user configures the job as below:
> >>>>>  - fastCheckpointInterval = 3 minutes
> >>>>>  - slowCheckpointInterval = 30 minutes
> >>>>>  - metricsUpdateInterval = 100 ms
> >>>>>
> >>>>> Using the above formulate, we can know that once pendingRecords
> >>>>> <= numRecordsInPerSecond * 30-minutes, then
> >>> calculatedCheckpointInterval
> >>>> <=
> >>>>> 3 minutes, meaning that we will use slowCheckpointInterval as the
> >>>>> checkpointing interval. Then in the last 30 minutes of the bounded
> >>> phase,
> >>>>> the checkpointing frequency will be 10X higher than what the user
> >>> wants.
> >>>>>
> >>>>> Also note that the same issue would also considerably limit the
> >>> benefits
> >>>> of
> >>>>> the algorithm. For example, during the continuous phase, the
> >> algorithm
> >>>> will
> >>>>> only be better than the approach in FLIP-309 when there is at least
> >>>>> 30-minutes worth of backlog in the source.
> >>>>>
> >>>>> Sure, having a slower checkpointing interval in this extreme case
> >>> (where
> >>>>> there is 30-minutes backlog in the continous-unbounded phase) is
> >> still
> >>>>> useful when this happens. But since this is the un-common case, and
> >> the
> >>>>> right solution is probably to do capacity planning to avoid this from
> >>>>> happening in the first place, I am not sure it is worth optimizing
> >> for
> >>>> this
> >>>>> case at the cost of regression in the bounded phase and the reduced
> >>>>> operational predictability for users (e.g. what checkpointing
> >> interval
> >>>>> should I expect at this stage of the job).
> >>>>>
> >>>>> I think the fundamental issue with this algorithm is that it is
> >> applied
> >>>> to
> >>>>> both the bounded phases and the continous_unbounded phases without
> >>>> knowing
> >>>>> which phase the job is running at. The only information it can access
> >>> is
> >>>>> the backlog. But two sources with the same amount of backlog do not
> >>>>> necessarily mean they have the same data freshness requirement.
> >>>>>
> >>>>> In this particular example, users know that the data in HDFS is very
> >>> old
> >>>>> and there is no need for data freshness. Users can express signals
> >> via
> >>>> the
> >>>>> per-source API proposed in the FLIP. This is why the current approach
> >>> in
> >>>>> FLIP-309 can be better in this case.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> Best,
> >>>>> Dong
> >>>>>
> >>>>>
> >>>>>>
> >>>>>> Best,
> >>>>>> Piotrek
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>
>

Reply via email to