trystanj commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1589232955


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,6 +153,56 @@ private Map<ScalingMetric, EvaluatedScalingMetric> 
evaluateMetrics(
         return evaluatedMetrics;
     }
 
+    private static EvaluatedScalingMetric evaluateTpr(
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
+            JobVertexID vertex,
+            Map<ScalingMetric, Double> latestVertexMetrics,
+            Configuration conf) {
+
+        var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+        var observedTprAvg =
+                getAverage(
+                        OBSERVED_TPR,
+                        vertex,
+                        metricsHistory,
+                        
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+        var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+        return new EvaluatedScalingMetric(
+                latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+                tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+    }
+
+    private static ScalingMetric selectTprMetric(
+            JobVertexID jobVertexID,
+            Configuration conf,
+            double busyTimeTprAvg,
+            double observedTprAvg) {
+
+        if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+            return OBSERVED_TPR;
+        }
+
+        if (Double.isNaN(observedTprAvg)) {
+            return TRUE_PROCESSING_RATE;
+        }
+
+        double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+        // If we could measure the observed tpr we decide whether to switch to 
using it
+        // instead of busy time based on the error / difference between the two
+        if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   Great idea, thanks for the tip! I saw that config setting but did not 
understand what it was actually used for. I will experiment with increasing 
that setting - our jobs are all over the place in how quickly they recover 
(some are a few seconds, some are several minutes) but I would bias towards 
accuracy in order to avoid the flapping in our case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to