[ 
https://issues.apache.org/jira/browse/FLINK-34152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813772#comment-17813772
 ] 

Emanuele Pirro commented on FLINK-34152:
----------------------------------------

Thanks Maximilian for reporting this. It's indeed an issue, especially the 
`OutOfMemoryErrors` on scale in.

> Tune memory of autoscaled jobs
> ------------------------------
>
>                 Key: FLINK-34152
>                 URL: https://issues.apache.org/jira/browse/FLINK-34152
>             Project: Flink
>          Issue Type: New Feature
>          Components: Autoscaler, Kubernetes Operator
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: kubernetes-operator-1.8.0
>
>
> The current autoscaling algorithm adjusts the parallelism of the job task 
> vertices according to the processing needs. By adjusting the parallelism, we 
> systematically scale the amount of CPU for a task. At the same time, we also 
> indirectly change the amount of memory tasks have at their dispense. However, 
> there are some problems with this.
>  # Memory is overprovisioned: On scale up we may add more memory than we 
> actually need. Even on scale down, the memory / cpu ratio can still be off 
> and too much memory is used.
>  # Memory is underprovisioned: For stateful jobs, we risk running into 
> OutOfMemoryErrors on scale down. Even before running out of memory, too 
> little memory can have a negative impact on the effectiveness of the scaling.
> We lack the capability to tune memory proportionally to the processing needs. 
> In the same way that we measure CPU usage and size the tasks accordingly, we 
> need to evaluate memory usage and adjust the heap memory size.
> A tuning algorithm could look like this:
> h2. 1. Establish a memory baseline
> We observe the average heap memory usage at task managers.
> h2. 2. Calculate memory usage per record
> The memory requirements per record can be estimated by calculating this ratio:
> {noformat}
> memory_per_rec = sum(heap_usage) / sum(records_processed)
> {noformat}
> This ratio is surprisingly constant based off empirical data.
> h2. 3. Scale memory proportionally to the per-record memory needs
> {noformat}
> memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers 
> {noformat}
> A minimum memory limit needs to be added to avoid scaling down memory too 
> much. The max memory per TM should be equal to the initially defined 
> user-specified limit from the ResourceSpec. 
> {noformat}
> memory_per_tm = max(min_limit, memory_per_tm)
> memory_per_tm = min(max_limit, memory_per_tm) {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to