Thank you Leonard for the review! Hi Piotr, do you have any comments on the latest proposal?
I am wondering if it is OK to start the voting thread this week. On Mon, Jun 26, 2023 at 4:10 PM Leonard Xu <xbjt...@gmail.com> wrote: > Thanks Dong for driving this FLIP forward! > > Introducing `backlog status` concept for flink job makes sense to me as > following reasons: > > From concept/API design perspective, it’s more general and natural than > above proposals as it can be used in HybridSource for bounded records, CDC > Source for history snapshot and general sources like KafkaSource for > historical messages. > > From user cases/requirements, I’ve seen many users manually to set larger > checkpoint interval during backfilling and then set a shorter checkpoint > interval for real-time processing in their production environments as a > flink application optimization. Now, the flink framework can make this > optimization no longer require the user to set the checkpoint interval and > restart the job multiple times. > > Following supporting using larger checkpoint for job under backlog status > in current FLIP, we can explore supporting larger parallelism/memory/cpu > for job under backlog status in the future. > > In short, the updated FLIP looks good to me. > > > Best, > Leonard > > > > On Jun 22, 2023, at 12:07 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > Hi Piotr, > > > > Thanks again for proposing the isProcessingBacklog concept. > > > > After discussing with Becket Qin and thinking about this more, I agree it > > is a better idea to add a top-level concept to all source operators to > > address the target use-case. > > > > The main reason that changed my mind is that isProcessingBacklog can be > > described as an inherent/nature attribute of every source instance and > its > > semantics does not need to depend on any specific checkpointing policy. > > Also, we can hardcode the isProcessingBacklog behavior for the sources we > > have considered so far (e.g. HybridSource and MySQL CDC source) without > > asking users to explicitly configure the per-source behavior, which > indeed > > provides better user experience. > > > > I have updated the FLIP based on the latest suggestions. The latest FLIP > no > > longer introduces per-source config that can be used by end-users. While > I > > agree with you that CheckpointTrigger can be a useful feature to address > > additional use-cases, I am not sure it is necessary for the use-case > > targeted by FLIP-309. Maybe we can introduce CheckpointTrigger separately > > in another FLIP? > > > > Can you help take another look at the updated FLIP? > > > > Best, > > Dong > > > > > > > > On Fri, Jun 16, 2023 at 11:59 PM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > >> Hi Dong, > >> > >>> Suppose there are 1000 subtask and each subtask has 1% chance of being > >>> "backpressured" at a given time (due to random traffic spikes). Then at > >> any > >>> given time, the chance of the job > >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate > the > >>> backpressure metric once a second, the estimated time for the job > >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = > 23163 > >>> sec = 6.4 hours. > >>> > >>> This means that the job will effectively always use the longer > >>> checkpointing interval. It looks like a real concern, right? > >> > >> Sorry I don't understand where you are getting those numbers from. > >> Instead of trying to find loophole after loophole, could you try to > think > >> how a given loophole could be improved/solved? > >> > >>> Hmm... I honestly think it will be useful to know the APIs due to the > >>> following reasons. > >> > >> Please propose something. I don't think it's needed. > >> > >>> - For the use-case mentioned in FLIP-309 motivation section, would the > >> APIs > >>> of this alternative approach be more or less usable? > >> > >> Everything that you originally wanted to achieve in FLIP-309, you could > do > >> as well in my proposal. > >> Vide my many mentions of the "hacky solution". > >> > >>> - Can these APIs reliably address the extra use-case (e.g. allow > >>> checkpointing interval to change dynamically even during the unbounded > >>> phase) as it claims? > >> > >> I don't see why not. > >> > >>> - Can these APIs be decoupled from the APIs currently proposed in > >> FLIP-309? > >> > >> Yes > >> > >>> For example, if the APIs of this alternative approach can be decoupled > >> from > >>> the APIs currently proposed in FLIP-309, then it might be reasonable to > >>> work on this extra use-case with a more advanced/complicated design > >>> separately in a followup work. > >> > >> As I voiced my concerns previously, the current design of FLIP-309 would > >> clog the public API and in the long run confuse the users. IMO It's > >> addressing the > >> problem in the wrong place. > >> > >>> Hmm.. do you mean we can do the following: > >>> - Have all source operators emit a metric named "processingBacklog". > >>> - Add a job-level config that specifies "the checkpointing interval to > be > >>> used when any source is processing backlog". > >>> - The JM collects the "processingBacklog" periodically from all source > >>> operators and uses the newly added config value as appropriate. > >> > >> Yes. > >> > >>> The challenge with this approach is that we need to define the > semantics > >> of > >>> this "processingBacklog" metric and have all source operators > >>> implement this metric. I am not sure we are able to do this yet without > >>> having users explicitly provide this information on a per-source basis. > >>> > >>> Suppose the job read from a bounded Kafka source, should it emit > >>> "processingBacklog=true"? If yes, then the job might use long > >> checkpointing > >>> interval even > >>> if the job is asked to process data starting from now to the next 1 > hour. > >>> If no, then the job might use the short checkpointing interval > >>> even if the job is asked to re-process data starting from 7 days ago. > >> > >> Yes. The same can be said of your proposal. Your proposal has the very > same > >> issues > >> that every source would have to implement it differently, most sources > >> would > >> have no idea how to properly calculate the new requested checkpoint > >> interval, > >> for those that do know how to do that, user would have to configure > every > >> source > >> individually and yet again we would end up with a system, that works > only > >> partially in > >> some special use cases (HybridSource), that's confusing the users even > >> more. > >> > >> That's why I think the more generic solution, working primarily on the > same > >> metrics that are used by various auto scaling solutions (like Flink K8s > >> operator's > >> autosaler) would be better. The hacky solution I proposed to: > >> 1. show you that the generic solution is simply a superset of your > proposal > >> 2. if you are adamant that busyness/backpressured/records processing > >> rate/pending records > >> metrics wouldn't cover your use case sufficiently (imo they can), > then > >> you can very easily > >> enhance this algorithm with using some hints from the sources. Like > >> "processingBacklog==true" > >> to short circuit the main algorithm, if `processingBacklog` is > >> available. > >> > >> Best, > >> Piotrek > >> > >> > >> pt., 16 cze 2023 o 04:45 Dong Lin <lindon...@gmail.com> napisał(a): > >> > >>> Hi again Piotr, > >>> > >>> Thank you for the reply. Please see my reply inline. > >>> > >>> On Fri, Jun 16, 2023 at 12:11 AM Piotr Nowojski < > >> piotr.nowoj...@gmail.com> > >>> wrote: > >>> > >>>> Hi again Dong, > >>>> > >>>>> I understand that JM will get the backpressure-related metrics every > >>> time > >>>>> the RestServerEndpoint receives the REST request to get these > >> metrics. > >>>> But > >>>>> I am not sure if RestServerEndpoint is already always receiving the > >>> REST > >>>>> metrics at regular interval (suppose there is no human manually > >>>>> opening/clicking the Flink Web UI). And if it does, what is the > >>> interval? > >>>> > >>>> Good catch, I've thought that metrics are pre-emptively sent to JM > >> every > >>> 10 > >>>> seconds. > >>>> Indeed that's not the case at the moment, and that would have to be > >>>> improved. > >>>> > >>>>> I would be surprised if Flink is already paying this much overhead > >> just > >>>> for > >>>>> metrics monitoring. That is the main reason I still doubt it is true. > >>> Can > >>>>> you show where this 100 ms is currently configured? > >>>>> > >>>>> Alternatively, maybe you mean that we should add extra code to invoke > >>> the > >>>>> REST API at 100 ms interval. Then that means we need to considerably > >>>>> increase the network/cpu overhead at JM, where the overhead will > >>> increase > >>>>> as the number of TM/slots increase, which may pose risk to the > >>>> scalability > >>>>> of the proposed design. I am not sure we should do this. What do you > >>>> think? > >>>> > >>>> Sorry. I didn't mean metric should be reported every 100ms. I meant > >> that > >>>> "backPressuredTimeMsPerSecond (metric) would report (a value of) > >>> 100ms/s." > >>>> once per metric interval (10s?). > >>>> > >>> > >>> Suppose there are 1000 subtask and each subtask has 1% chance of being > >>> "backpressured" at a given time (due to random traffic spikes). Then at > >> any > >>> given time, the chance of the job > >>> being considered not-backpressured = (1-0.01)^1000. Since we evaluate > the > >>> backpressure metric once a second, the estimated time for the job > >>> to be considered not-backpressured is roughly 1 / ((1-0.01)^1000) = > 23163 > >>> sec = 6.4 hours. > >>> > >>> This means that the job will effectively always use the longer > >>> checkpointing interval. It looks like a real concern, right? > >>> > >>> > >>>>> - What is the interface of this CheckpointTrigger? For example, are > >> we > >>>>> going to give CheckpointTrigger a context that it can use to fetch > >>>>> arbitrary metric values? This can help us understand what information > >>>> this > >>>>> user-defined CheckpointTrigger can use to make the checkpoint > >> decision. > >>>> > >>>> I honestly don't think this is important at this stage of the > >> discussion. > >>>> It could have > >>>> whatever interface we would deem to be best. Required things: > >>>> > >>>> - access to at least a subset of metrics that the given > >>> `CheckpointTrigger` > >>>> requests, > >>>> for example via some registration mechanism, so we don't have to > >> fetch > >>>> all of the > >>>> metrics all the time from TMs. > >>>> - some way to influence `CheckpointCoordinator`. Either via manually > >>>> triggering > >>>> checkpoints, and/or ability to change the checkpointing interval. > >>>> > >>> > >>> Hmm... I honestly think it will be useful to know the APIs due to the > >>> following reasons. > >>> > >>> We would need to know the concrete APIs to gauge the following: > >>> - For the use-case mentioned in FLIP-309 motivation section, would the > >> APIs > >>> of this alternative approach be more or less usable? > >>> - Can these APIs reliably address the extra use-case (e.g. allow > >>> checkpointing interval to change dynamically even during the unbounded > >>> phase) as it claims? > >>> - Can these APIs be decoupled from the APIs currently proposed in > >> FLIP-309? > >>> > >>> For example, if the APIs of this alternative approach can be decoupled > >> from > >>> the APIs currently proposed in FLIP-309, then it might be reasonable to > >>> work on this extra use-case with a more advanced/complicated design > >>> separately in a followup work. > >>> > >>> > >>>>> - Where is this CheckpointTrigger running? For example, is it going > >> to > >>>> run > >>>>> on the subtask of every source operator? Or is it going to run on the > >>> JM? > >>>> > >>>> IMO on the JM. > >>>> > >>>>> - Are we going to provide a default implementation of this > >>>>> CheckpointTrigger in Flink that implements the algorithm described > >>> below, > >>>>> or do we expect each source operator developer to implement their own > >>>>> CheckpointTrigger? > >>>> > >>>> As I mentioned before, I think we should provide at the very least the > >>>> implementation > >>>> that replaces the current triggering mechanism (statically configured > >>>> checkpointing interval) > >>>> and it would be great to provide the backpressure monitoring trigger > as > >>>> well. > >>>> > >>> > >>> I agree that if there is a good use-case that can be addressed by the > >>> proposed CheckpointTrigger, then it is reasonable > >>> to add CheckpointTrigger and replace the current triggering mechanism > >> with > >>> it. > >>> > >>> I also agree that we will likely find such a use-case. For example, > >> suppose > >>> the source records have event timestamps, then it is likely > >>> that we can use the trigger to dynamically control the checkpointing > >>> interval based on the difference between the watermark and current > system > >>> time. > >>> > >>> But I am not sure the addition of this CheckpointTrigger should be > >> coupled > >>> with FLIP-309. Whether or not it is coupled probably depends on the > >>> concrete API design around CheckpointTrigger. > >>> > >>> If you would be adamant that the backpressure monitoring doesn't cover > >> well > >>>> enough your use case, I would be ok to provide the hacky version that > I > >>>> also mentioned > >>>> before: > >>> > >>> > >>>> """ > >>>> Especially that if my proposed algorithm wouldn't work good enough, > >> there > >>>> is > >>>> an obvious solution, that any source could add a metric, like let say > >>>> "processingBacklog: true/false", and the `CheckpointTrigger` > >>>> could use this as an override to always switch to the > >>>> "slowCheckpointInterval". I don't think we need it, but that's always > >> an > >>>> option > >>>> that would be basically equivalent to your original proposal. > >>>> """ > >>>> > >>> > >>> Hmm.. do you mean we can do the following: > >>> - Have all source operators emit a metric named "processingBacklog". > >>> - Add a job-level config that specifies "the checkpointing interval to > be > >>> used when any source is processing backlog". > >>> - The JM collects the "processingBacklog" periodically from all source > >>> operators and uses the newly added config value as appropriate. > >>> > >>> The challenge with this approach is that we need to define the > semantics > >> of > >>> this "processingBacklog" metric and have all source operators > >>> implement this metric. I am not sure we are able to do this yet without > >>> having users explicitly provide this information on a per-source basis. > >>> > >>> Suppose the job read from a bounded Kafka source, should it emit > >>> "processingBacklog=true"? If yes, then the job might use long > >> checkpointing > >>> interval even > >>> if the job is asked to process data starting from now to the next 1 > hour. > >>> If no, then the job might use the short checkpointing interval > >>> even if the job is asked to re-process data starting from 7 days ago. > >>> > >>> > >>>> > >>>>> - How can users specify the > >>>> fastCheckpointInterval/slowCheckpointInterval? > >>>>> For example, will we provide APIs on the CheckpointTrigger that > >>> end-users > >>>>> can use to specify the checkpointing interval? What would that look > >>> like? > >>>> > >>>> Also as I mentioned before, just like metric reporters are configured: > >>>> > >>>> > >>> > >> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/metric_reporters/ > >>>> Every CheckpointTrigger could have its own custom configuration. > >>>> > >>>>> Overall, my gut feel is that the alternative approach based on > >>>>> CheckpointTrigger is more complicated > >>>> > >>>> Yes, as usual, more generic things are more complicated, but often > more > >>>> useful in the long run. > >>>> > >>>>> and harder to use. > >>>> > >>>> I don't agree. Why setting in config > >>>> > >>>> execution.checkpointing.trigger: > >> BackPressureMonitoringCheckpointTrigger > >>>> > >>>> > >>> > >> > execution.checkpointing.BackPressureMonitoringCheckpointTrigger.fast-interval: > >>>> 1s > >>>> > >>>> > >>> > >> > execution.checkpointing.BackPressureMonitoringCheckpointTrigger.slow-interval: > >>>> 30s > >>>> > >>>> that we could even provide a shortcut to the above construct via: > >>>> > >>>> execution.checkpointing.fast-interval: 1s > >>>> execution.checkpointing.slow-interval: 30s > >>>> > >>>> is harder compared to setting two/three checkpoint intervals, one in > >> the > >>>> config/or via `env.enableCheckpointing(x)`, > >>>> secondly passing one/two (fast/slow) values on the source itself? > >>>> > >>> > >>> If we can address the use-case by providing just the two job-level > config > >>> as described above, I agree it will indeed be simpler. > >>> > >>> I have tried to achieve this goal. But the caveat is that it requires > >> much > >>> more work than described above in order to give the configs > well-defined > >>> semantics. So I find it simpler to just use the approach in FLIP-309. > >>> > >>> Let me explain my concern below. It will be great if you or someone > else > >>> can help provide a solution. > >>> > >>> 1) We need to clearly document when the fast-interval and slow-interval > >>> will be used so that users can derive the expected behavior of the job > >> and > >>> be able to config these values. > >>> > >>> 2) The trigger of fast/slow interval depends on the behavior of the > >> source > >>> (e.g. MySQL CDC, HybridSource). However, no existing concepts of source > >>> operator (e.g. boundedness) can describe the target behavior. For > >> example, > >>> MySQL CDC internally has two phases, namely snapshot phase and binlog > >>> phase, which are not explicitly exposed to its users via source > operator > >>> API. And we probably should not enumerate all internal phases of all > >> source > >>> operators that are affected by fast/slow interval. > >>> > >>> 3) An alternative approach might be to define a new concept (e.g. > >>> processingBacklog) that is applied to all source operators. Then the > >>> fast/slow interval's documentation can depend on this concept. That > means > >>> we have to add a top-level concept (similar to source boundedness) and > >>> require all source operators to specify how they enforce this concept > >> (e.g. > >>> FileSystemSource always emits processingBacklog=true). And there might > be > >>> cases where the source itself (e.g. a bounded Kafka Source) can not > >>> automatically derive the value of this concept, in which case we need > to > >>> provide option for users to explicitly specify the value for this > concept > >>> on a per-source basis. > >>> > >>> > >>> > >>>>> And it probably also has the issues of "having two places to > >> configure > >>>> checkpointing > >>>>> interval" and "giving flexibility for every source to implement a > >>>> different > >>>>> API" (as mentioned below). > >>>> > >>>> No, it doesn't. > >>>> > >>>>> IMO, it is a hard-requirement for the user-facing API to be > >>>>> clearly defined and users should be able to use the API without > >> concern > >>>> of > >>>>> regression. And this requirement is more important than the other > >> goals > >>>>> discussed above because it is related to the stability/performance of > >>> the > >>>>> production job. What do you think? > >>>> > >>>> I don't agree with this. There are many things that work something in > >>>> between perfectly and well enough > >>>> in some fraction of use cases (maybe in 99%, maybe 95% or maybe 60%), > >>> while > >>>> still being very useful. > >>>> Good examples are: selection of state backend, unaligned checkpoints, > >>>> buffer debloating but frankly if I go > >>>> through list of currently available config options, something like > half > >>> of > >>>> them can cause regressions. Heck, > >>>> even Flink itself doesn't work perfectly in 100% of the use cases, due > >>> to a > >>>> variety of design choices. Of > >>>> course, the more use cases are fine with said feature, the better, but > >> we > >>>> shouldn't fixate to perfectly cover > >>>> 100% of the cases, as that's impossible. > >>>> > >>>> In this particular case, if back pressure monitoring trigger can work > >>> well > >>>> enough in 95% of cases, I would > >>>> say that's already better than the originally proposed alternative, > >> which > >>>> doesn't work at all if user has a large > >>>> backlog to reprocess from Kafka, including when using HybridSource > >> AFTER > >>>> the switch to Kafka has > >>>> happened. For the remaining 5%, we should try to improve the behaviour > >>> over > >>>> time, but ultimately, users can > >>>> decide to just run a fixed checkpoint interval (or at worst use the > >> hacky > >>>> checkpoint trigger that I mentioned > >>>> before a couple of times). > >>>> > >>>> Also to be pedantic, if a user naively selects slow-interval in your > >>>> proposal to 30 minutes, when that user's > >>>> job fails on average every 15-20minutes, his job can end up in a state > >>> that > >>>> it can not make any progress, > >>>> this arguably is quite serious regression. > >>>> > >>> > >>> I probably should not say it is "hard requirement". After all there are > >>> pros/cons. We will need to consider implementation complexity, > usability, > >>> extensibility etc. > >>> > >>> I just don't think we should take it for granted to introduce > regression > >>> for one use-case in order to support another use-case. If we can not > find > >>> an algorithm/solution that addresses > >>> both use-case well, I hope we can be open to tackle them separately so > >> that > >>> users can choose the option that best fits their needs. > >>> > >>> All things else being equal, I think it is preferred for user-facing > API > >> to > >>> be clearly defined and let users should be able to use the API without > >>> concern of regression. > >>> > >>> Maybe we can list pros/cons for the alternative approaches we have been > >>> discussing and see choose the best approach. And maybe we will end up > >>> finding that use-case > >>> which needs CheckpointTrigger can be tackled separately from the > use-case > >>> in FLIP-309. > >>> > >>> > >>>>> I am not sure if there is a typo. Because if > >>> backPressuredTimeMsPerSecond > >>>> = > >>>>> 0, then maxRecordsConsumedWithoutBackpressure = > >> numRecordsInPerSecond / > >>>>> 1000 * metricsUpdateInterval according to the above algorithm. > >>>>> > >>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = > >>>> (numRecordsInPerSecond > >>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > >> metricsUpdateInterval"? > >>>> > >>>> It looks like there is indeed some mistake in my proposal above. Yours > >>> look > >>>> more correct, it probably > >>>> still needs some safeguard/special handling if > >>>> `backPressuredTimeMsPerSecond > 950` > >>>> > >>>>> The only information it can access is the backlog. > >>>> > >>>> Again no. It can access whatever we want to provide to it. > >>>> > >>>> Regarding the rest of your concerns. It's a matter of tweaking the > >>>> parameters and the algorithm itself, > >>>> and how much safety-net do we want to have. Ultimately, I'm pretty > sure > >>>> that's a (for 95-99% of cases) > >>>> solvable problem. If not, there is always the hacky solution, that > >> could > >>> be > >>>> even integrated into this above > >>>> mentioned algorithm as a short circuit to always reach > `slow-interval`. > >>>> > >>>> Apart of that, you picked 3 minutes as the checkpointing interval in > >> your > >>>> counter example. In most cases > >>>> any interval above 1 minute would inflict pretty negligible overheads, > >> so > >>>> all in all, I would doubt there is > >>>> a significant benefit (in most cases) of increasing 3 minute > checkpoint > >>>> interval to anything more, let alone > >>>> 30 minutes. > >>>> > >>> > >>> I am not sure we should design the algorithm with the assumption that > the > >>> short checkpointing interval will always be higher than 1 minute etc. > >>> > >>> I agree the proposed algorithm can solve most cases where the resource > is > >>> sufficient and there is always no backlog in source subtasks. On the > >> other > >>> hand, what makes SRE > >>> life hard is probably the remaining 1-5% cases where the traffic is > spiky > >>> and the cluster is reaching its capacity limit. The ability to predict > >> and > >>> control Flink job's behavior (including checkpointing interval) can > >>> considerably reduce the burden of manging Flink jobs. > >>> > >>> Best, > >>> Dong > >>> > >>> > >>>> > >>>> Best, > >>>> Piotrek > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> sob., 3 cze 2023 o 05:44 Dong Lin <lindon...@gmail.com> napisał(a): > >>>> > >>>>> Hi Piotr, > >>>>> > >>>>> Thanks for the explanations. I have some followup questions below. > >>>>> > >>>>> On Fri, Jun 2, 2023 at 10:55 PM Piotr Nowojski <pnowoj...@apache.org > >>> > >>>>> wrote: > >>>>> > >>>>>> Hi All, > >>>>>> > >>>>>> Thanks for chipping in the discussion Ahmed! > >>>>>> > >>>>>> Regarding using the REST API. Currently I'm leaning towards > >>>> implementing > >>>>>> this feature inside the Flink itself, via some pluggable interface. > >>>>>> REST API solution would be tempting, but I guess not everyone is > >>> using > >>>>>> Flink Kubernetes Operator. > >>>>>> > >>>>>> @Dong > >>>>>> > >>>>>>> I am not sure metrics such as isBackPressured are already sent to > >>> JM. > >>>>>> > >>>>>> Fetching code path on the JM: > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl#queryTmMetricsFuture > >>>>>> > >> org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore#add > >>>>>> > >>>>>> Example code path accessing Task level metrics via JM using the > >>>>>> `MetricStore`: > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > org.apache.flink.runtime.rest.handler.job.metrics.AggregatingSubtasksMetricsHandler > >>>>>> > >>>>> > >>>>> Thanks for the code reference. I checked the code that invoked these > >>> two > >>>>> classes and found the following information: > >>>>> > >>>>> - AggregatingSubtasksMetricsHandler#getStoresis currently invoked > >> only > >>>>> when AggregatingJobsMetricsHandler is invoked. > >>>>> - AggregatingJobsMetricsHandler is only instantiated and returned by > >>>>> WebMonitorEndpoint#initializeHandlers > >>>>> - WebMonitorEndpoint#initializeHandlers is only used by > >>>> RestServerEndpoint. > >>>>> And RestServerEndpoint invokes these handlers in response to external > >>>> REST > >>>>> request. > >>>>> > >>>>> I understand that JM will get the backpressure-related metrics every > >>> time > >>>>> the RestServerEndpoint receives the REST request to get these > >> metrics. > >>>> But > >>>>> I am not sure if RestServerEndpoint is already always receiving the > >>> REST > >>>>> metrics at regular interval (suppose there is no human manually > >>>>> opening/clicking the Flink Web UI). And if it does, what is the > >>> interval? > >>>>> > >>>>> > >>>>> > >>>>>>> For example, let's say every source operator subtask reports this > >>>>> metric > >>>>>> to > >>>>>>> JM once every 10 seconds. There are 100 source subtasks. And each > >>>>> subtask > >>>>>>> is backpressured roughly 10% of the total time due to traffic > >>> spikes > >>>>> (and > >>>>>>> limited buffer). Then at any given time, there are 1 - 0.9^100 = > >>>>> 99.997% > >>>>>>> chance that there is at least one subtask that is backpressured. > >>> Then > >>>>> we > >>>>>>> have to wait for at least 10 seconds to check again. > >>>>>> > >>>>>> backPressuredTimeMsPerSecond and other related metrics (like > >>>>>> busyTimeMsPerSecond) are not subject to that problem. > >>>>>> They are recalculated once every metric fetching interval, and they > >>>>> report > >>>>>> accurately on average the given subtask spent > >>>> busy/idling/backpressured. > >>>>>> In your example, backPressuredTimeMsPerSecond would report 100ms/s. > >>>>> > >>>>> > >>>>> Suppose every subtask is already reporting > >> backPressuredTimeMsPerSecond > >>>> to > >>>>> JM once every 100 ms. If a job has 10 operators (that are not > >> chained) > >>>> and > >>>>> each operator has 100 subtasks, then JM would need to handle 10000 > >>>> requests > >>>>> per second to receive metrics from these 1000 subtasks. It seems > >> like a > >>>>> non-trivial overhead for medium-to-large sized jobs and can make JM > >> the > >>>>> performance bottleneck during job execution. > >>>>> > >>>>> I would be surprised if Flink is already paying this much overhead > >> just > >>>> for > >>>>> metrics monitoring. That is the main reason I still doubt it is true. > >>> Can > >>>>> you show where this 100 ms is currently configured? > >>>>> > >>>>> Alternatively, maybe you mean that we should add extra code to invoke > >>> the > >>>>> REST API at 100 ms interval. Then that means we need to considerably > >>>>> increase the network/cpu overhead at JM, where the overhead will > >>> increase > >>>>> as the number of TM/slots increase, which may pose risk to the > >>>> scalability > >>>>> of the proposed design. I am not sure we should do this. What do you > >>>> think? > >>>>> > >>>>> > >>>>>> > >>>>>>> While it will be nice to support additional use-cases > >>>>>>> with one proposal, it is probably also reasonable to make > >>> incremental > >>>>>>> progress and support the low-hanging-fruit use-case first. The > >>> choice > >>>>>>> really depends on the complexity and the importance of supporting > >>> the > >>>>>> extra > >>>>>>> use-cases. > >>>>>> > >>>>>> That would be true, if that was a private implementation detail or > >> if > >>>> the > >>>>>> low-hanging-fruit-solution would be on the direct path to the final > >>>>>> solution. > >>>>>> That's unfortunately not the case here. This will add public facing > >>>> API, > >>>>>> that we will later need to maintain, no matter what the final > >>> solution > >>>>> will > >>>>>> be, > >>>>>> and at the moment at least I don't see it being related to a > >>> "perfect" > >>>>>> solution. > >>>>> > >>>>> > >>>>> Sure. Then let's decide the final solution first. > >>>>> > >>>>> > >>>>>>> I guess the point is that the suggested approach, which > >> dynamically > >>>>>>> determines the checkpointing interval based on the backpressure, > >>> may > >>>>>> cause > >>>>>>> regression when the checkpointing interval is relatively low. > >> This > >>>>> makes > >>>>>> it > >>>>>>> hard for users to enable this feature in production. It is like > >> an > >>>>>>> auto-driving system that is not guaranteed to work > >>>>>> > >>>>>> Yes, creating a more generic solution that would require less > >>>>> configuration > >>>>>> is usually more difficult then static configurations. > >>>>>> It doesn't mean we shouldn't try to do that. Especially that if my > >>>>> proposed > >>>>>> algorithm wouldn't work good enough, there is > >>>>>> an obvious solution, that any source could add a metric, like let > >> say > >>>>>> "processingBacklog: true/false", and the `CheckpointTrigger` > >>>>>> could use this as an override to always switch to the > >>>>>> "slowCheckpointInterval". I don't think we need it, but that's > >> always > >>>> an > >>>>>> option > >>>>>> that would be basically equivalent to your original proposal. Or > >> even > >>>>>> source could add "suggestedCheckpointInterval : int", and > >>>>>> `CheckpointTrigger` could use that value if present as a hint in > >> one > >>>> way > >>>>> or > >>>>>> another. > >>>>>> > >>>>> > >>>>> So far we have talked about the possibility of using > >> CheckpointTrigger > >>>> and > >>>>> mentioned the CheckpointTrigger > >>>>> and read metric values. > >>>>> > >>>>> Can you help answer the following questions so that I can understand > >>> the > >>>>> alternative solution more concretely: > >>>>> > >>>>> - What is the interface of this CheckpointTrigger? For example, are > >> we > >>>>> going to give CheckpointTrigger a context that it can use to fetch > >>>>> arbitrary metric values? This can help us understand what information > >>>> this > >>>>> user-defined CheckpointTrigger can use to make the checkpoint > >> decision. > >>>>> - Where is this CheckpointTrigger running? For example, is it going > >> to > >>>> run > >>>>> on the subtask of every source operator? Or is it going to run on the > >>> JM? > >>>>> - Are we going to provide a default implementation of this > >>>>> CheckpointTrigger in Flink that implements the algorithm described > >>> below, > >>>>> or do we expect each source operator developer to implement their own > >>>>> CheckpointTrigger? > >>>>> - How can users specify the > >>>> fastCheckpointInterval/slowCheckpointInterval? > >>>>> For example, will we provide APIs on the CheckpointTrigger that > >>> end-users > >>>>> can use to specify the checkpointing interval? What would that look > >>> like? > >>>>> > >>>>> Overall, my gut feel is that the alternative approach based on > >>>>> CheckpointTrigger is more complicated and harder to use. And it > >>> probably > >>>>> also has the issues of "having two places to configure checkpointing > >>>>> interval" and "giving flexibility for every source to implement a > >>>> different > >>>>> API" (as mentioned below). > >>>>> > >>>>> Maybe we can evaluate it more after knowing the answers to the above > >>>>> questions. > >>>>> > >>>>> > >>>>> > >>>>>> > >>>>>>> On the other hand, the approach currently proposed in the FLIP is > >>>> much > >>>>>>> simpler as it does not depend on backpressure. Users specify the > >>>> extra > >>>>>>> interval requirement on the specific sources (e.g. HybridSource, > >>>> MySQL > >>>>>> CDC > >>>>>>> Source) and can easily know the checkpointing interval will be > >> used > >>>> on > >>>>>> the > >>>>>>> continuous phase of the corresponding source. This is pretty much > >>>> same > >>>>> as > >>>>>>> how users use the existing execution.checkpointing.interval > >> config. > >>>> So > >>>>>>> there is no extra concern of regression caused by this approach. > >>>>>> > >>>>>> To an extent, but as I have already previously mentioned I really > >>>> really > >>>>> do > >>>>>> not like idea of: > >>>>>> - having two places to configure checkpointing interval (config > >>> file > >>>>> and > >>>>>> in the Source builders) > >>>>>> - giving flexibility for every source to implement a different > >> API > >>>> for > >>>>>> that purpose > >>>>>> - creating a solution that is not generic enough, so that we will > >>>> need > >>>>> a > >>>>>> completely different mechanism in the future anyway > >>>>>> > >>>>> > >>>>> Yeah, I understand different developers might have different > >>>>> concerns/tastes for these APIs. Ultimately, there might not be a > >>> perfect > >>>>> solution and we have to choose based on the pros/cons of these > >>> solutions. > >>>>> > >>>>> I agree with you that, all things being equal, it is preferable to 1) > >>>> have > >>>>> one place to configure checkpointing intervals, 2) have all source > >>>>> operators use the same API, and 3) create a solution that is generic > >>> and > >>>>> last lasting. Note that these three goals affects the usability and > >>>>> extensibility of the API, but not necessarily the > >> stability/performance > >>>> of > >>>>> the production job. > >>>>> > >>>>> BTW, there are also other preferrable goals. For example, it is very > >>>> useful > >>>>> for the job's behavior to be predictable and interpretable so that > >> SRE > >>>> can > >>>>> operator/debug the Flink in an easier way. We can list these > >> pros/cons > >>>>> altogether later. > >>>>> > >>>>> I am wondering if we can first agree on the priority of goals we want > >>> to > >>>>> achieve. IMO, it is a hard-requirement for the user-facing API to be > >>>>> clearly defined and users should be able to use the API without > >> concern > >>>> of > >>>>> regression. And this requirement is more important than the other > >> goals > >>>>> discussed above because it is related to the stability/performance of > >>> the > >>>>> production job. What do you think? > >>>>> > >>>>> > >>>>>> > >>>>>>> Sounds good. Looking forward to learning more ideas. > >>>>>> > >>>>>> I have thought about this a bit more, and I think we don't need to > >>>> check > >>>>>> for the backpressure status, or how much overloaded all of the > >>>> operators > >>>>>> are. > >>>>>> We could just check three things for source operators: > >>>>>> 1. pendingRecords (backlog length) > >>>>>> 2. numRecordsInPerSecond > >>>>>> 3. backPressuredTimeMsPerSecond > >>>>>> > >>>>>> // int metricsUpdateInterval = 10s // obtained from config > >>>>>> // Next line calculates how many records can we consume from the > >>>> backlog, > >>>>>> assuming > >>>>>> // that magically the reason behind a backpressure vanishes. We > >> will > >>>> use > >>>>>> this only as > >>>>>> // a safeguard against scenarios like for example if backpressure > >>> was > >>>>>> caused by some > >>>>>> // intermittent failure/performance degradation. > >>>>>> maxRecordsConsumedWithoutBackpressure = (numRecordsInPerSecond / > >>> (1000 > >>>>>> - backPressuredTimeMsPerSecond / 1000)) * metricsUpdateInterval > >>>>>> > >>>>> > >>>>> I am not sure if there is a typo. Because if > >>>> backPressuredTimeMsPerSecond = > >>>>> 0, then maxRecordsConsumedWithoutBackpressure = > >> numRecordsInPerSecond / > >>>>> 1000 * metricsUpdateInterval according to the above algorithm. > >>>>> > >>>>> Do you mean "maxRecordsConsumedWithoutBackpressure = > >>>> (numRecordsInPerSecond > >>>>> / (1 - backPressuredTimeMsPerSecond / 1000)) * > >> metricsUpdateInterval"? > >>>>> > >>>>> > >>>>>> > >>>>>> // we are excluding maxRecordsConsumedWithoutBackpressure from the > >>>>> backlog > >>>>>> as > >>>>>> // a safeguard against an intermittent back pressure problems, so > >>> that > >>>> we > >>>>>> don't > >>>>>> // calculate next checkpoint interval far far in the future, while > >>> the > >>>>>> backpressure > >>>>>> // goes away before we will recalculate metrics and new > >> checkpointing > >>>>>> interval > >>>>>> timeToConsumeBacklog = (pendingRecords - > >>>>>> maxRecordsConsumedWithoutBackpressure) / numRecordsInPerSecond > >>>>>> > >>>>>> > >>>>>> Then we can use those numbers to calculate desired checkpointed > >>>> interval > >>>>>> for example like this: > >>>>>> > >>>>>> long calculatedCheckpointInterval = timeToConsumeBacklog / 10; > >> //this > >>>> may > >>>>>> need some refining > >>>>>> long nextCheckpointInterval = min(max(fastCheckpointInterval, > >>>>>> calculatedCheckpointInterval), slowCheckpointInterval); > >>>>>> long nextCheckpointTs = lastCheckpointTs + nextCheckpointInterval; > >>>>>> > >>>>>> WDYT? > >>>>> > >>>>> > >>>>> I think the idea of the above algorithm is to incline to use the > >>>>> fastCheckpointInterval unless we are very sure the backlog will take > >> a > >>>> long > >>>>> time to process. This can alleviate the concern of regression during > >>> the > >>>>> continuous_bounded phase since we are more likely to use the > >>>>> fastCheckpointInterval. However, it can cause regression during the > >>>> bounded > >>>>> phase. > >>>>> > >>>>> I will use a concrete example to explain the risk of regression: > >>>>> - The user is using HybridSource to read from HDFS followed by Kafka. > >>> The > >>>>> data in HDFS is old and there is no need for data freshness for the > >>> data > >>>> in > >>>>> HDFS. > >>>>> - The user configures the job as below: > >>>>> - fastCheckpointInterval = 3 minutes > >>>>> - slowCheckpointInterval = 30 minutes > >>>>> - metricsUpdateInterval = 100 ms > >>>>> > >>>>> Using the above formulate, we can know that once pendingRecords > >>>>> <= numRecordsInPerSecond * 30-minutes, then > >>> calculatedCheckpointInterval > >>>> <= > >>>>> 3 minutes, meaning that we will use slowCheckpointInterval as the > >>>>> checkpointing interval. Then in the last 30 minutes of the bounded > >>> phase, > >>>>> the checkpointing frequency will be 10X higher than what the user > >>> wants. > >>>>> > >>>>> Also note that the same issue would also considerably limit the > >>> benefits > >>>> of > >>>>> the algorithm. For example, during the continuous phase, the > >> algorithm > >>>> will > >>>>> only be better than the approach in FLIP-309 when there is at least > >>>>> 30-minutes worth of backlog in the source. > >>>>> > >>>>> Sure, having a slower checkpointing interval in this extreme case > >>> (where > >>>>> there is 30-minutes backlog in the continous-unbounded phase) is > >> still > >>>>> useful when this happens. But since this is the un-common case, and > >> the > >>>>> right solution is probably to do capacity planning to avoid this from > >>>>> happening in the first place, I am not sure it is worth optimizing > >> for > >>>> this > >>>>> case at the cost of regression in the bounded phase and the reduced > >>>>> operational predictability for users (e.g. what checkpointing > >> interval > >>>>> should I expect at this stage of the job). > >>>>> > >>>>> I think the fundamental issue with this algorithm is that it is > >> applied > >>>> to > >>>>> both the bounded phases and the continous_unbounded phases without > >>>> knowing > >>>>> which phase the job is running at. The only information it can access > >>> is > >>>>> the backlog. But two sources with the same amount of backlog do not > >>>>> necessarily mean they have the same data freshness requirement. > >>>>> > >>>>> In this particular example, users know that the data in HDFS is very > >>> old > >>>>> and there is no need for data freshness. Users can express signals > >> via > >>>> the > >>>>> per-source API proposed in the FLIP. This is why the current approach > >>> in > >>>>> FLIP-309 can be better in this case. > >>>>> > >>>>> What do you think? > >>>>> > >>>>> Best, > >>>>> Dong > >>>>> > >>>>> > >>>>>> > >>>>>> Best, > >>>>>> Piotrek > >>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >