[
https://issues.apache.org/jira/browse/FLINK-39743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Devika Sudheer updated FLINK-39743:
-----------------------------------
Description:
h1. Overview
The Flink Autoscaler computes the expected processing rate
(EXPECTED_PROCESSING_RATE) using cappedTargetCapacity, which is derived from
the initial scaling factor proposed by the algorithm after bounding it within
MAX_SCALE_DOWN_FACTOR and MAX_SCALE_UP_FACTOR.
[https://github.com/apache/flink-kubernetes-operator/blame/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L255]
However, the recommended parallelism can later be modified inside the scale()
method due to additional constraints such as max parallelism, vertex-level
min/max limits, and input partition counts. As a result, the final applied
scaling factor may differ from the factor originally used to compute
cappedTargetCapacity. This mismatch can cause effective scaling actions to be
incorrectly classified as ineffective.
For example, consider a vertex with:
* Current parallelism = 100
* Current average true processing rate = 100 messages/sec
* Recommended scaling factor = 3
Based on the initial scaling factor, the expected processing rate is computed
as 300 messages/sec. However, if the vertex max parallelism is 200, the final
recommended parallelism is capped at 200, meaning the effective scaling factor
becomes 2 instead of 3. In this case, the expected processing rate should be
200 messages/sec, not 300 messages/sec.
With an ineffective scale-out detection threshold of 10%, the minimum expected
improvement would incorrectly be evaluated against 300 messages/sec (threshold
= 120 messages/sec) rather than the actual capped expectation of 200
messages/sec (threshold = 110 messages/sec). As a result, a valid post-scale
processing rate such as 115 messages/sec could still be incorrectly treated as
ineffective, causing subsequent scale-out actions to be unnecessarily blocked.
h1. Proposed Solution
Compute the expected processing rate after the scale() method finalizes and
adjusts newParallelism based on constraints such as max parallelism,
vertex-level min/max limits, input partition counts, etc.
{code:java}
double cappedTargetCapacity = averageTrueProcessingRate * (newParallelism /
currentParallelism);
evaluatedMetrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(cappedTargetCapacity)); {code}
was:
h1. Overview
The Flink Autoscaler computes the expected processing rate
(EXPECTED_PROCESSING_RATE) using cappedTargetCapacity, which is derived from
the initial scaling factor proposed by the algorithm after bounding it within
MAX_SCALE_DOWN_FACTOR and MAX_SCALE_UP_FACTOR.
[https://github.com/apache/flink-kubernetes-operator/blame/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L255]
However, the recommended parallelism can later be modified inside the scale()
method due to additional constraints such as max parallelism, vertex-level
min/max limits, and input partition counts. As a result, the final applied
scaling factor may differ from the factor originally used to compute
cappedTargetCapacity. This mismatch can cause effective scaling actions to be
incorrectly classified as ineffective.
For example, consider a vertex with:
* Current parallelism = 100
* Current average true processing rate = 100 messages/sec
* Recommended scaling factor = 3
Based on the initial scaling factor, the expected processing rate is computed
as 300 messages/sec. However, if the vertex max parallelism is 200, the final
recommended parallelism is capped at 200, meaning the effective scaling factor
becomes 2 instead of 3. In this case, the expected processing rate should be
200 messages/sec, not 300 messages/sec.
With an ineffective scale-out detection threshold of 10%, the minimum expected
improvement would incorrectly be evaluated against 300 messages/sec (threshold
= 120 messages/sec) rather than the actual capped expectation of 200
messages/sec (threshold = 110 messages/sec). As a result, a valid post-scale
processing rate such as 115 messages/sec could still be incorrectly treated as
ineffective, causing subsequent scale-out actions to be unnecessarily blocked.
h1. Proposed Solution
Compute the expected processing rate after the scale() method finalizes and
adjusts newParallelism based on constraints such as max parallelism,
vertex-level min/max limits, input partition counts, etc.
double cappedTargetCapacity = averageTrueProcessingRate * (newParallelism /
currentParallelism);
evaluatedMetrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE,
EvaluatedScalingMetric.of(cappedTargetCapacity));
> Incorrect Expected Processing Rate Computation
> ----------------------------------------------
>
> Key: FLINK-39743
> URL: https://issues.apache.org/jira/browse/FLINK-39743
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Kubernetes Operator
> Affects Versions: kubernetes-operator-1.16.0
> Reporter: Devika Sudheer
> Priority: Minor
>
> h1. Overview
> The Flink Autoscaler computes the expected processing rate
> (EXPECTED_PROCESSING_RATE) using cappedTargetCapacity, which is derived from
> the initial scaling factor proposed by the algorithm after bounding it within
> MAX_SCALE_DOWN_FACTOR and MAX_SCALE_UP_FACTOR.
> [https://github.com/apache/flink-kubernetes-operator/blame/main/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java#L255]
>
> However, the recommended parallelism can later be modified inside the scale()
> method due to additional constraints such as max parallelism, vertex-level
> min/max limits, and input partition counts. As a result, the final applied
> scaling factor may differ from the factor originally used to compute
> cappedTargetCapacity. This mismatch can cause effective scaling actions to be
> incorrectly classified as ineffective.
> For example, consider a vertex with:
> * Current parallelism = 100
> * Current average true processing rate = 100 messages/sec
> * Recommended scaling factor = 3
> Based on the initial scaling factor, the expected processing rate is computed
> as 300 messages/sec. However, if the vertex max parallelism is 200, the final
> recommended parallelism is capped at 200, meaning the effective scaling
> factor becomes 2 instead of 3. In this case, the expected processing rate
> should be 200 messages/sec, not 300 messages/sec.
> With an ineffective scale-out detection threshold of 10%, the minimum
> expected improvement would incorrectly be evaluated against 300 messages/sec
> (threshold = 120 messages/sec) rather than the actual capped expectation of
> 200 messages/sec (threshold = 110 messages/sec). As a result, a valid
> post-scale processing rate such as 115 messages/sec could still be
> incorrectly treated as ineffective, causing subsequent scale-out actions to
> be unnecessarily blocked.
> h1. Proposed Solution
> Compute the expected processing rate after the scale() method finalizes and
> adjusts newParallelism based on constraints such as max parallelism,
> vertex-level min/max limits, input partition counts, etc.
> {code:java}
> double cappedTargetCapacity = averageTrueProcessingRate * (newParallelism /
> currentParallelism);
> evaluatedMetrics.put(ScalingMetric.EXPECTED_PROCESSING_RATE,
> EvaluatedScalingMetric.of(cappedTargetCapacity)); {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)