trystanj commented on code in PR #686:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/686#discussion_r1588463242


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -154,6 +153,56 @@ private Map<ScalingMetric, EvaluatedScalingMetric> 
evaluateMetrics(
         return evaluatedMetrics;
     }
 
+    private static EvaluatedScalingMetric evaluateTpr(
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
+            JobVertexID vertex,
+            Map<ScalingMetric, Double> latestVertexMetrics,
+            Configuration conf) {
+
+        var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+        var observedTprAvg =
+                getAverage(
+                        OBSERVED_TPR,
+                        vertex,
+                        metricsHistory,
+                        
conf.get(AutoScalerOptions.OBSERVED_TPR_MIN_OBSERVATIONS));
+
+        var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+        return new EvaluatedScalingMetric(
+                latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+                tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+    }
+
+    private static ScalingMetric selectTprMetric(
+            JobVertexID jobVertexID,
+            Configuration conf,
+            double busyTimeTprAvg,
+            double observedTprAvg) {
+
+        if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+            return OBSERVED_TPR;
+        }
+
+        if (Double.isNaN(observedTprAvg)) {
+            return TRUE_PROCESSING_RATE;
+        }
+
+        double switchThreshold = 
conf.get(AutoScalerOptions.OBSERVED_TPR_SWITCH_THRESHOLD);
+        // If we could measure the observed tpr we decide whether to switch to 
using it
+        // instead of busy time based on the error / difference between the two
+        if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {

Review Comment:
   old PR, but fwiw I am now seeing a large _under_ estimation of capacity in 
the case of a "normal" source (no decompression, etc. just simple 
deserialization):
   
   `Using observed tpr 916327.7340000003 for 1385b0b55fdb909b073b014d9c682612 
as busy time based seems too large (2018541.9166666663)`
   
   In fact the actual capacity is indeed around 2M, if not higher. We are on 
1.7, but perhaps 1.8 will have improved its observed TPR due to [this 
enhancement](https://flink.apache.org/2024/03/21/apache-flink-kubernetes-operator-1.8.0-release-announcement/#improved-accuracy-of-autoscaling-metrics)?
   
   So yeah, I can set an arbitrary high config value for the switch, but that 
feels less elegant than just disabling it imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to