Hi Gyula,

Thank you for the feedback! With your permission, I plan to integrate the
implementation into the flink-kubernetes-operator-autoscaler module to test
it on my env. Subsequently, maybe contribute these changes back to the
community by submitting a pull request to the GitHub repository in the
coming months.

Best,
Yang

On Tue, 7 Nov 2023 at 19:08, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Sounds like a lot of work for very little gain to me. If you really feel
> that there is some room for improvement with the current implementation, it
> may be simpler to fix that .
>
> Gyula
>
> On Tue, 7 Nov 2023 at 01:20, Yang LI <yang.hunter...@gmail.com> wrote:
>
>> Thanks for the information!
>>
>> I haven't tested Kuberntes's built-in rollback mechanism yet. I feel like
>> I can create another independent operator which detects flink application
>> jvm memory and triggers rollback.
>>
>> Another solution I would like to discuss is also to implement an
>> independent operator. This operator do following things:
>>
>>    - Retrieve the state size metrics for Flink applications from
>>    Prometheus.
>>    - Gather current and recommended parallelism metrics from the Flink
>>    operator, also reported in Prometheus.
>>    - If a downscale is advised, I would calculate whether the new
>>    cluster configuration, considering state size and JVM heap max size, can
>>    support the entire state; if not, the operation would be halted.
>>    - If feasible, this operator would manage the rescaling process
>>    similarly to the Flink operator, either by making API requests or by
>>    applying a kubectl patch to the FlinkDeployment CRD.
>>
>> By doing this we could achieve something similar to what we can do with a
>> plugin system, Of course in this case I'll disable scaling of the flink
>> operator, Do you think it could work?
>>
>> Best,
>> Yang
>>
>> On Mon, 6 Nov 2023 at 23:43, Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Hey!
>>>
>>> Bit of a tricky problem, as it's not really possible to know that the
>>> job will be able to start with lower parallelism in some cases. Custom
>>> plugins may work but that would be an extremely complex solution at this
>>> point.
>>>
>>> The Kubernetes operator has a built-in rollback mechanism that can help
>>> with rolling back these broken scale operations, have you tried that?
>>> Furthermore we are planning to introduce some heap/GC related metrics soon
>>> (probably after the next release for 1.8.0) that may help us catching these
>>> issues.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Mon, Nov 6, 2023 at 9:27 AM Yang LI <yang.hunter...@gmail.com> wrote:
>>>
>>>> Dear Flink Community,
>>>>
>>>> I am currently working on implementing auto-scaling for my Flink
>>>> application using the Flink operator's autoscaler. During testing, I
>>>> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
>>>> the autoscaler attempted to scale down. This issue arises when the incoming
>>>> record rate decreases while the state size has not yet reduced
>>>> correspondingly. Despite numerous tests, managing this issue has been
>>>> difficult due to the lack of a parameter that allows for specifying a
>>>> cooldown period(essential for processing and reducing state size)prior to
>>>> actual scaling down. Moreover, determining an optimal duration for this
>>>> cooldown period is also not straightforward. I believe that enhancing the
>>>> autoscaler with a focus on memory checks or more broadly on stability
>>>> conditions could significantly address this issue.. Here are some potential
>>>> solutions that, in my opinion, could improve the situation:
>>>>
>>>>    1. Integrate heap memory-related metrics into the metric
>>>>    collection, coupled with a memory safety margin check within the
>>>>    autoscaler's algorithm.
>>>>
>>>>    2. Introduce a plugin system and a pre-rescaling step in the Flink
>>>>    operator's autoscaler, which would allow users to implement custom 
>>>> plugins.
>>>>    These plugins could host listeners that activate during the pre-hook 
>>>> step,
>>>>    adding an additional checkpoint before the algorithm executes. So we can
>>>>    keep blocking scaling down until custom checks are passed to ensure it 
>>>> is
>>>>    safe to proceed with scaling down.
>>>>
>>>>    3. Implement a parameter that establishes a stability threshold for
>>>>    heap memory usage percentage or jvm old gc (duration or count). In the
>>>>    event that the threshold is exceeded, the system would revert to the 
>>>> last
>>>>    stable scale in the scaling history. Then the stabilization interval 
>>>> would
>>>>    start to work, providing the Flink cluster with additional time to 
>>>> process
>>>>    and reduce the state size
>>>>
>>>>
>>>>
>>>> Let me know what you think about it! Thanks!
>>>>
>>>> Best,
>>>>
>>>> Yang LI
>>>>
>>>

Reply via email to