Could you please elaborate a little in which scenarios you find that the
pending record metrics are not good to track Kafka lag?

Thanks
Gyula

On Mon, Dec 11, 2023 at 4:26 PM Yang LI <yang.hunter...@gmail.com> wrote:

> Hello,
>
> Following our recent discussion, I've successfully implemented a Flink
> operator featuring a "memory protection" patch. However, in the course of
> my testing, I've encountered an issue: the Flink operator relies on the
> 'pending_record' metric to gauge backlog. Unfortunately, this metric
> doesn't seem to accurately represent the lag in the Kafka topic in certain
> scenarios.
>
> Could you advise if there are any configurations I might have overlooked
> that could enhance the autoscaler's ability to scale up in response to lags
> in Kafka topics?
>
> Regards,
> Yang LI
>
> On Wed, 15 Nov 2023 at 20:39, Yang LI <yang.hunter...@gmail.com> wrote:
>
>> Thanks Maximilian and Gyula, I'll keep you updated.
>>
>> Best,
>> Yang
>>
>> On Sat, 11 Nov 2023 at 16:18, Maximilian Michels <m...@apache.org> wrote:
>>
>>> Hi Yang,
>>>
>>> We're always open to changes / additions to the autoscaler logic and
>>> metric collection. Preferably, we change these directly in the
>>> autoscaler implementation, without adding additional processes or
>>> controllers. Let us know how your experiments go! If you want to
>>> contribute, a JIRA with a description of the changes would be the
>>> first step. We can take it from there.
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Nov 7, 2023 at 9:04 PM Yang LI <yang.hunter...@gmail.com> wrote:
>>> >
>>> > Hi Gyula,
>>> >
>>> > Thank you for the feedback! With your permission, I plan to integrate
>>> the implementation into the flink-kubernetes-operator-autoscaler module to
>>> test it on my env. Subsequently, maybe contribute these changes back to the
>>> community by submitting a pull request to the GitHub repository in the
>>> coming months.
>>> >
>>> > Best,
>>> > Yang
>>> >
>>> > On Tue, 7 Nov 2023 at 19:08, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>> >>
>>> >> Sounds like a lot of work for very little gain to me. If you really
>>> feel that there is some room for improvement with the current
>>> implementation, it may be simpler to fix that .
>>> >>
>>> >> Gyula
>>> >>
>>> >> On Tue, 7 Nov 2023 at 01:20, Yang LI <yang.hunter...@gmail.com>
>>> wrote:
>>> >>>
>>> >>> Thanks for the information!
>>> >>>
>>> >>> I haven't tested Kuberntes's built-in rollback mechanism yet. I feel
>>> like I can create another independent operator which detects flink
>>> application jvm memory and triggers rollback.
>>> >>>
>>> >>> Another solution I would like to discuss is also to implement an
>>> independent operator. This operator do following things:
>>> >>>
>>> >>> Retrieve the state size metrics for Flink applications from
>>> Prometheus.
>>> >>> Gather current and recommended parallelism metrics from the Flink
>>> operator, also reported in Prometheus.
>>> >>> If a downscale is advised, I would calculate whether the new cluster
>>> configuration, considering state size and JVM heap max size, can support
>>> the entire state; if not, the operation would be halted.
>>> >>> If feasible, this operator would manage the rescaling process
>>> similarly to the Flink operator, either by making API requests or by
>>> applying a kubectl patch to the FlinkDeployment CRD.
>>> >>>
>>> >>> By doing this we could achieve something similar to what we can do
>>> with a plugin system, Of course in this case I'll disable scaling of the
>>> flink operator, Do you think it could work?
>>> >>>
>>> >>> Best,
>>> >>> Yang
>>> >>>
>>> >>> On Mon, 6 Nov 2023 at 23:43, Gyula Fóra <gyula.f...@gmail.com>
>>> wrote:
>>> >>>>
>>> >>>> Hey!
>>> >>>>
>>> >>>> Bit of a tricky problem, as it's not really possible to know that
>>> the job will be able to start with lower parallelism in some cases. Custom
>>> plugins may work but that would be an extremely complex solution at this
>>> point.
>>> >>>>
>>> >>>> The Kubernetes operator has a built-in rollback mechanism that can
>>> help with rolling back these broken scale operations, have you tried that?
>>> Furthermore we are planning to introduce some heap/GC related metrics soon
>>> (probably after the next release for 1.8.0) that may help us catching these
>>> issues.
>>> >>>>
>>> >>>> Cheers,
>>> >>>> Gyula
>>> >>>>
>>> >>>> On Mon, Nov 6, 2023 at 9:27 AM Yang LI <yang.hunter...@gmail.com>
>>> wrote:
>>> >>>>>
>>> >>>>> Dear Flink Community,
>>> >>>>>
>>> >>>>> I am currently working on implementing auto-scaling for my Flink
>>> application using the Flink operator's autoscaler. During testing, I
>>> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
>>> the autoscaler attempted to scale down. This issue arises when the incoming
>>> record rate decreases while the state size has not yet reduced
>>> correspondingly. Despite numerous tests, managing this issue has been
>>> difficult due to the lack of a parameter that allows for specifying a
>>> cooldown period(essential for processing and reducing state size)prior to
>>> actual scaling down. Moreover, determining an optimal duration for this
>>> cooldown period is also not straightforward. I believe that enhancing the
>>> autoscaler with a focus on memory checks or more broadly on stability
>>> conditions could significantly address this issue.. Here are some potential
>>> solutions that, in my opinion, could improve the situation:
>>> >>>>>
>>> >>>>> Integrate heap memory-related metrics into the metric collection,
>>> coupled with a memory safety margin check within the autoscaler's algorithm.
>>> >>>>>
>>> >>>>> Introduce a plugin system and a pre-rescaling step in the Flink
>>> operator's autoscaler, which would allow users to implement custom plugins.
>>> These plugins could host listeners that activate during the pre-hook step,
>>> adding an additional checkpoint before the algorithm executes. So we can
>>> keep blocking scaling down until custom checks are passed to ensure it is
>>> safe to proceed with scaling down.
>>> >>>>>
>>> >>>>> Implement a parameter that establishes a stability threshold for
>>> heap memory usage percentage or jvm old gc (duration or count). In the
>>> event that the threshold is exceeded, the system would revert to the last
>>> stable scale in the scaling history. Then the stabilization interval would
>>> start to work, providing the Flink cluster with additional time to process
>>> and reduce the state size
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> Let me know what you think about it! Thanks!
>>> >>>>>
>>> >>>>> Best,
>>> >>>>>
>>> >>>>> Yang LI
>>>
>>

Reply via email to