trystanj commented on code in PR #686: URL: https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1589232955
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ########## @@ -154,6 +153,56 @@ private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics( return evaluatedMetrics; } + private static EvaluatedScalingMetric evaluateTpr( + SortedMap<Instant, CollectedMetrics> metricsHistory, + JobVertexID vertex, + Map<ScalingMetric, Double> latestVertexMetrics, + Configuration conf) { + + var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, metricsHistory); + var observedTprAvg = + getAverage( + OBSERVED_TPR, + vertex, + metricsHistory, + conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS)); + + var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, observedTprAvg); + return new EvaluatedScalingMetric( + latestVertexMetrics.getOrDefault(tprMetric, Double.NaN), + tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg); + } + + private static ScalingMetric selectTprMetric( + JobVertexID jobVertexID, + Configuration conf, + double busyTimeTprAvg, + double observedTprAvg) { + + if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) { + return OBSERVED_TPR; + } + + if (Double.isNaN(observedTprAvg)) { + return TRUE_PROCESSING_RATE; + } + + double switchThreshold = conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD); + // If we could measure the observed tpr we decide whether to switch to using it + // instead of busy time based on the error / difference between the two + if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) { Review Comment: Great idea, thanks for the tip! I saw that config setting but did not understand what it was actually used for. I will experiment with increasing that setting - our jobs are all over the place in how quickly they recover (some are a few seconds, some are several minutes) but I would bias towards accuracy in order to avoid the flapping in our case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org