[ 
https://issues.apache.org/jira/browse/FLINK-31977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17805650#comment-17805650
 ] 

Maximilian Michels commented on FLINK-31977:
--------------------------------------------

I think this is related to FLINK-33993. The name of the configuration option is 
a bit misleading, as effectiveness detection is always on but scalings are only 
blocked when the option is set to {{true}}.

> If scaling.effectiveness.detection.enabled is false, the call to the 
> detectIneffectiveScaleUp() function is unnecessary
> -----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-31977
>                 URL: https://issues.apache.org/jira/browse/FLINK-31977
>             Project: Flink
>          Issue Type: Improvement
>          Components: Autoscaler
>    Affects Versions: 1.17.0
>            Reporter: Tan Kim
>            Priority: Minor
>
> The code below is a function to detect inefficient scaleups.
> It returns a result if the value of SCALING_EFFECTIVENESS_DETECTION_ENABLED 
> (scaling.effectiveness.detection.enabled) is true after all the necessary 
> computations for detection, but this is an unnecessary computation.
> {code:java}
> JobVertexScaler.java #175
> private boolean detectIneffectiveScaleUp(
>         AbstractFlinkResource<?, ?> resource,
>         JobVertexID vertex,
>         Configuration conf,
>         Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
>         ScalingSummary lastSummary) {
>     double lastProcRate = 
> lastSummary.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(); // 
> 22569.315633422066
>     double lastExpectedProcRate =
>             
> lastSummary.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(); // 
> 37340.0
>     var currentProcRate = 
> evaluatedMetrics.get(TRUE_PROCESSING_RATE).getAverage();
>     // To judge the effectiveness of the scale up operation we compute how 
> much of the expected
>     // increase actually happened. For example if we expect a 100 increase in 
> proc rate and only
>     // got an increase of 10 we only accomplished 10% of the desired 
> increase. If this number is
>     // below the threshold, we mark the scaling ineffective.
>     double expectedIncrease = lastExpectedProcRate - lastProcRate;
>     double actualIncrease = currentProcRate - lastProcRate;
>     boolean withinEffectiveThreshold =
>             (actualIncrease / expectedIncrease)
>                     >= 
> conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD);
>     if (withinEffectiveThreshold) {
>         return false;
>     }
>     var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
>     eventRecorder.triggerEvent(
>             resource,
>             EventRecorder.Type.Normal,
>             EventRecorder.Reason.IneffectiveScaling,
>             EventRecorder.Component.Operator,
>             message);
>     if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>         LOG.info(message);
>         return true;
>     } else {
>         return false;
>     }
> } {code}
> In the call to the detectIneffectiveScaleUp function, I would suggest 
> checking SCALING_EFFECTIVENESS_DETECTION_ENABLED first, as follows.
> {code:java}
> JobVertexScaler.java #150
> if (currentParallelism == lastSummary.getNewParallelism() && 
> lastSummary.isScaledUp()) {
>     if (scaledUp) {
>         
> if(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
>             return detectIneffectiveScaleUp(resource, vertex, conf, 
> evaluatedMetrics, lastSummary);
>         } else {
>             return true;
>         }
>     } else {
>         return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
> lastScalingTs);
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to