Devika Sudheer created FLINK-39743:
--------------------------------------

             Summary: Incorrect Expected Processing Rate Computation
                 Key: FLINK-39743
                 URL: https://issues.apache.org/jira/browse/FLINK-39743
             Project: Flink
          Issue Type: Bug
          Components: Autoscaler, Kubernetes Operator
    Affects Versions: kubernetes-operator-1.16.0
            Reporter: Devika Sudheer


h1. Overview

The Flink Autoscaler computes the expected processing rate 
(EXPECTED_PROCESSING_RATE) using cappedTargetCapacity, which is derived from 
the initial scaling factor proposed by the algorithm after bounding it within 
MAX_SCALE_DOWN_FACTOR and MAX_SCALE_UP_FACTOR. 

[https://github.com/apache/flink-kubernetes-operator/blame/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L255]
 

However, the recommended parallelism can later be modified inside the scale() 
method due to additional constraints such as max parallelism, vertex-level 
min/max limits, and input partition counts. As a result, the final applied 
scaling factor may differ from the factor originally used to compute 
cappedTargetCapacity. This mismatch can cause effective scaling actions to be 
incorrectly classified as ineffective.

For example, consider a vertex with:
 * Current parallelism = 100
 * Current average true processing rate = 100 messages/sec
 * Recommended scaling factor = 3

Based on the initial scaling factor, the expected processing rate is computed 
as 300 messages/sec. However, if the vertex max parallelism is 200, the final 
recommended parallelism is capped at 200, meaning the effective scaling factor 
becomes 2 instead of 3. In this case, the expected processing rate should be 
200 messages/sec, not 300 messages/sec.

With an ineffective scale-out detection threshold of 10%, the minimum expected 
improvement would incorrectly be evaluated against 300 messages/sec (threshold 
= 120 messages/sec) rather than the actual capped expectation of 200 
messages/sec (threshold = 110 messages/sec). As a result, a valid post-scale 
processing rate such as 115 messages/sec could still be incorrectly treated as 
ineffective, causing subsequent scale-out actions to be unnecessarily blocked.
h1. Proposed Solution

 

Compute the expected processing rate after the scale() method finalizes and 
adjusts newParallelism based on constraints such as max parallelism, 
vertex-level min/max limits, input partition counts, etc.

 

double cappedTargetCapacity = averageTrueProcessingRate * (newParallelism / 
currentParallelism);

evaluatedMetrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE,

                EvaluatedScalingMetric.of(cappedTargetCapacity));



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to