Could you please elaborate a little in which scenarios you find that the pending record metrics are not good to track Kafka lag?
Thanks Gyula On Mon, Dec 11, 2023 at 4:26 PM Yang LI <yang.hunter...@gmail.com> wrote: > Hello, > > Following our recent discussion, I've successfully implemented a Flink > operator featuring a "memory protection" patch. However, in the course of > my testing, I've encountered an issue: the Flink operator relies on the > 'pending_record' metric to gauge backlog. Unfortunately, this metric > doesn't seem to accurately represent the lag in the Kafka topic in certain > scenarios. > > Could you advise if there are any configurations I might have overlooked > that could enhance the autoscaler's ability to scale up in response to lags > in Kafka topics? > > Regards, > Yang LI > > On Wed, 15 Nov 2023 at 20:39, Yang LI <yang.hunter...@gmail.com> wrote: > >> Thanks Maximilian and Gyula, I'll keep you updated. >> >> Best, >> Yang >> >> On Sat, 11 Nov 2023 at 16:18, Maximilian Michels <m...@apache.org> wrote: >> >>> Hi Yang, >>> >>> We're always open to changes / additions to the autoscaler logic and >>> metric collection. Preferably, we change these directly in the >>> autoscaler implementation, without adding additional processes or >>> controllers. Let us know how your experiments go! If you want to >>> contribute, a JIRA with a description of the changes would be the >>> first step. We can take it from there. >>> >>> Cheers, >>> Max >>> >>> On Tue, Nov 7, 2023 at 9:04 PM Yang LI <yang.hunter...@gmail.com> wrote: >>> > >>> > Hi Gyula, >>> > >>> > Thank you for the feedback! With your permission, I plan to integrate >>> the implementation into the flink-kubernetes-operator-autoscaler module to >>> test it on my env. Subsequently, maybe contribute these changes back to the >>> community by submitting a pull request to the GitHub repository in the >>> coming months. >>> > >>> > Best, >>> > Yang >>> > >>> > On Tue, 7 Nov 2023 at 19:08, Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >> >>> >> Sounds like a lot of work for very little gain to me. If you really >>> feel that there is some room for improvement with the current >>> implementation, it may be simpler to fix that . >>> >> >>> >> Gyula >>> >> >>> >> On Tue, 7 Nov 2023 at 01:20, Yang LI <yang.hunter...@gmail.com> >>> wrote: >>> >>> >>> >>> Thanks for the information! >>> >>> >>> >>> I haven't tested Kuberntes's built-in rollback mechanism yet. I feel >>> like I can create another independent operator which detects flink >>> application jvm memory and triggers rollback. >>> >>> >>> >>> Another solution I would like to discuss is also to implement an >>> independent operator. This operator do following things: >>> >>> >>> >>> Retrieve the state size metrics for Flink applications from >>> Prometheus. >>> >>> Gather current and recommended parallelism metrics from the Flink >>> operator, also reported in Prometheus. >>> >>> If a downscale is advised, I would calculate whether the new cluster >>> configuration, considering state size and JVM heap max size, can support >>> the entire state; if not, the operation would be halted. >>> >>> If feasible, this operator would manage the rescaling process >>> similarly to the Flink operator, either by making API requests or by >>> applying a kubectl patch to the FlinkDeployment CRD. >>> >>> >>> >>> By doing this we could achieve something similar to what we can do >>> with a plugin system, Of course in this case I'll disable scaling of the >>> flink operator, Do you think it could work? >>> >>> >>> >>> Best, >>> >>> Yang >>> >>> >>> >>> On Mon, 6 Nov 2023 at 23:43, Gyula Fóra <gyula.f...@gmail.com> >>> wrote: >>> >>>> >>> >>>> Hey! >>> >>>> >>> >>>> Bit of a tricky problem, as it's not really possible to know that >>> the job will be able to start with lower parallelism in some cases. Custom >>> plugins may work but that would be an extremely complex solution at this >>> point. >>> >>>> >>> >>>> The Kubernetes operator has a built-in rollback mechanism that can >>> help with rolling back these broken scale operations, have you tried that? >>> Furthermore we are planning to introduce some heap/GC related metrics soon >>> (probably after the next release for 1.8.0) that may help us catching these >>> issues. >>> >>>> >>> >>>> Cheers, >>> >>>> Gyula >>> >>>> >>> >>>> On Mon, Nov 6, 2023 at 9:27 AM Yang LI <yang.hunter...@gmail.com> >>> wrote: >>> >>>>> >>> >>>>> Dear Flink Community, >>> >>>>> >>> >>>>> I am currently working on implementing auto-scaling for my Flink >>> application using the Flink operator's autoscaler. During testing, I >>> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when >>> the autoscaler attempted to scale down. This issue arises when the incoming >>> record rate decreases while the state size has not yet reduced >>> correspondingly. Despite numerous tests, managing this issue has been >>> difficult due to the lack of a parameter that allows for specifying a >>> cooldown period(essential for processing and reducing state size)prior to >>> actual scaling down. Moreover, determining an optimal duration for this >>> cooldown period is also not straightforward. I believe that enhancing the >>> autoscaler with a focus on memory checks or more broadly on stability >>> conditions could significantly address this issue.. Here are some potential >>> solutions that, in my opinion, could improve the situation: >>> >>>>> >>> >>>>> Integrate heap memory-related metrics into the metric collection, >>> coupled with a memory safety margin check within the autoscaler's algorithm. >>> >>>>> >>> >>>>> Introduce a plugin system and a pre-rescaling step in the Flink >>> operator's autoscaler, which would allow users to implement custom plugins. >>> These plugins could host listeners that activate during the pre-hook step, >>> adding an additional checkpoint before the algorithm executes. So we can >>> keep blocking scaling down until custom checks are passed to ensure it is >>> safe to proceed with scaling down. >>> >>>>> >>> >>>>> Implement a parameter that establishes a stability threshold for >>> heap memory usage percentage or jvm old gc (duration or count). In the >>> event that the threshold is exceeded, the system would revert to the last >>> stable scale in the scaling history. Then the stabilization interval would >>> start to work, providing the Flink cluster with additional time to process >>> and reduce the state size >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> Let me know what you think about it! Thanks! >>> >>>>> >>> >>>>> Best, >>> >>>>> >>> >>>>> Yang LI >>> >>