Maximilian Michels created FLINK-34152: ------------------------------------------
Summary: Tune memory of autoscaled jobs Key: FLINK-34152 URL: https://issues.apache.org/jira/browse/FLINK-34152 Project: Flink Issue Type: New Feature Components: Autoscaler, Kubernetes Operator Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: kubernetes-operator-1.8.0 The current autoscaling algorithm adjusts the parallelism of the job task vertices according to the processing needs. By adjusting the parallelism, we systematically scale the amount of CPU for a task. At the same time, we also indirectly change the amount of memory tasks have at their dispense. However, there are some problems with this. # Memory is overprovisioned: On scale up we may add more memory then we actually need. Even on scale down, the memory / cpu ratio can still be off and too much memory is used. # Memory is underprovisioned: For stateful jobs, we risk running into OutOfMemoryErrors on scale down. Even before running out of memory, too little memory can have a negative impact on the effectiveness of the scaling. We lack the capability to tune memory proportionally to the processing needs. In the same way that we measure CPU usage and size the tasks accordingly, we need to evaluate memory usage and adjust the heap memory size. A tuning algorithm could look like this: h2. 1. Establish a memory baseline We observe the average heap memory usage at task managers. h2. 2. Calculate memory usage per record The memory requirements per record can be estimated by calculating this ratio: {noformat} memory_per_rec = sum(heap_usage) / sum(records_processed) {noformat} This ratio is surprisingly constant based off empirical data. h2. 3. Scale memory proportionally to the per-record memory needs {noformat} memory_per_tm = expected_records_per_sec * memory_per_rec / num_task_managers {noformat} A minimum memory limit needs to be added to avoid scaling down memory too much. The max memory per TM should be equal to the initially defined user-specified limit from the ResourceSpec. {noformat} memory_per_tm = max(min_limit, memory_per_tm) memory_per_tm = min(max_limit, memory_per_tm) {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)