This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new cc680e14 [FLINK-33306] Use observed source throughput as true 
processing rate
cc680e14 is described below

commit cc680e142bb8d52c4db215658ee7f4c4159a0fe4
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Tue Oct 24 17:28:02 2023 +0200

    [FLINK-33306] Use observed source throughput as true processing rate
---
 .../generated/auto_scaler_configuration.html       |  18 ++
 .../apache/flink/autoscaler/ScalingExecutor.java   |   1 -
 .../flink/autoscaler/ScalingMetricCollector.java   | 130 +++++++++-----
 .../flink/autoscaler/ScalingMetricEvaluator.java   |  99 +++++++++--
 .../flink/autoscaler/config/AutoScalerOptions.java |  30 ++++
 .../autoscaler/event/AutoScalerEventHandler.java   |   2 +-
 .../flink/autoscaler/metrics/FlinkMetric.java      |   3 +-
 .../flink/autoscaler/metrics/ScalingMetric.java    |   3 +
 .../flink/autoscaler/metrics/ScalingMetrics.java   |  71 +++++++-
 .../flink/autoscaler/realizer/ScalingRealizer.java |   3 +-
 .../autoscaler/state/AutoScalerStateStore.java     |   7 +-
 .../state/InMemoryAutoScalerStateStore.java        |  22 +--
 .../flink/autoscaler/BacklogBasedScalingTest.java  |   6 +-
 .../MetricsCollectionAndEvaluationTest.java        | 194 ++++++++++++++++++++-
 .../autoscaler/RecommendedParallelismTest.java     |   4 +-
 .../autoscaler/RestApiMetricsCollectorTest.java    |  18 +-
 .../autoscaler/ScalingMetricCollectorTest.java     |  59 +++++--
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 119 +++++++++++++
 .../autoscaler/metrics/ScalingMetricsTest.java     | 109 ++++++++++--
 .../operator/autoscaler/AutoscalerFactory.java     |   6 +-
 .../operator/autoscaler/ConfigMapStore.java        |  11 +-
 .../autoscaler/KubernetesAutoScalerStateStore.java |  12 +-
 .../KubernetesAutoScalerStateStoreTest.java        |  10 +-
 23 files changed, 773 insertions(+), 164 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 0b2d5b35..0221b567 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -56,6 +56,24 @@
             <td>Duration</td>
             <td>Scaling metrics aggregation window size.</td>
         </tr>
+        <tr>
+            
<td><h5>job.autoscaler.observed-true-processing-rate.lag-threshold</h5></td>
+            <td style="word-wrap: break-word;">30 s</td>
+            <td>Duration</td>
+            <td>Lag threshold for enabling observed true processing rate 
measurements.</td>
+        </tr>
+        <tr>
+            
<td><h5>job.autoscaler.observed-true-processing-rate.min-observations</h5></td>
+            <td style="word-wrap: break-word;">2</td>
+            <td>Integer</td>
+            <td>Minimum nr of observations used when estimating / switching to 
observed true processing rate.</td>
+        </tr>
+        <tr>
+            
<td><h5>job.autoscaler.observed-true-processing-rate.switch-threshold</h5></td>
+            <td style="word-wrap: break-word;">0.15</td>
+            <td>Double</td>
+            <td>Percentage threshold for switching to observed from busy time 
based true processing rate if the measurement is off by at least the configured 
fraction. For example 0.15 means we switch to observed if the busy time based 
computation is at least 15% higher during catchup.</td>
+        </tr>
         <tr>
             <td><h5>job.autoscaler.restart.time</h5></td>
             <td style="word-wrap: break-word;">3 min</td>
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index cedb4874..406b17d7 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -163,7 +163,6 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
         for (Map.Entry<JobVertexID, ScalingSummary> entry : 
scalingSummaries.entrySet()) {
             var vertex = entry.getKey();
-            var scalingSummary = entry.getValue();
             var metrics = evaluatedMetrics.get(vertex);
 
             double processingRate = 
metrics.get(TRUE_PROCESSING_RATE).getAverage();
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index 013fe14d..df5f8ad6 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -59,8 +59,10 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.updateVertexList;
 import static 
org.apache.flink.autoscaler.utils.AutoScalerUtils.excludeVerticesFromScaling;
@@ -90,39 +92,31 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                         jobKey,
                         (k) -> {
                             try {
-                                return 
stateStore.getEvaluatedMetrics(ctx).orElse(new TreeMap<>());
+                                return 
stateStore.getCollectedMetrics(ctx).orElse(new TreeMap<>());
                             } catch (Exception exception) {
                                 throw new RuntimeException(
                                         "Get evaluated metrics failed.", 
exception);
                             }
                         });
 
-        // The timestamp of the first metric observation marks the start
-        // If we haven't collected any metrics, we are starting now
-        var metricCollectionStartTs = metricHistory.isEmpty() ? now : 
metricHistory.firstKey();
-
         var jobDetailsInfo =
                 getJobDetailsInfo(ctx, 
conf.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
         var jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
-        if (jobUpdateTs.isAfter(metricCollectionStartTs)) {
+        // We detect job change compared to our collected metrics by checking 
against the earliest
+        // metric timestamp
+        if (!metricHistory.isEmpty() && 
jobUpdateTs.isAfter(metricHistory.firstKey())) {
             LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
-            stateStore.removeEvaluatedMetrics(ctx);
+            stateStore.removeCollectedMetrics(ctx);
             cleanup(ctx.getJobKey());
             metricHistory.clear();
-            metricCollectionStartTs = now;
         }
         var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
+        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
 
-        // Trim metrics outside the metric window from metrics history
+        // Calculate timestamp when the metric windows is full
         var metricWindowSize = getMetricWindowSize(conf);
-        metricHistory.headMap(now.minus(metricWindowSize)).clear();
-
-        var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
-        if (now.isBefore(stableTime)) {
-            // As long as we are stabilizing, collect no metrics at all
-            LOG.info("Skipping metric collection during stabilization period 
until {}", stableTime);
-            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
-        }
+        var windowFullTime =
+                getWindowFullTime(metricHistory.tailMap(stableTime), now, 
metricWindowSize);
 
         // The filtered list of metrics we want to query for each vertex
         var filteredVertexMetricNames = queryFilteredMetricNames(ctx, 
topology);
@@ -136,17 +130,22 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
 
         // Add scaling metrics to history if they were computed successfully
         metricHistory.put(now, scalingMetrics);
-        stateStore.storeEvaluatedMetrics(ctx, metricHistory);
 
-        var collectedMetrics = new CollectedMetricHistory(topology, 
metricHistory);
-
-        var windowFullTime = metricCollectionStartTs.plus(metricWindowSize);
-        collectedMetrics.setFullyCollected(!now.isBefore(windowFullTime));
+        if (now.isBefore(stableTime)) {
+            LOG.info("Stabilizing until {}", stableTime);
+            stateStore.storeCollectedMetrics(ctx, metricHistory);
+            return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
+        }
 
-        if (!collectedMetrics.isFullyCollected()) {
+        var collectedMetrics = new CollectedMetricHistory(topology, 
metricHistory);
+        if (now.isBefore(windowFullTime)) {
             LOG.info("Metric window not full until {}", windowFullTime);
+        } else {
+            collectedMetrics.setFullyCollected(true);
+            // Trim metrics outside the metric window from metrics history
+            metricHistory.headMap(now.minus(metricWindowSize)).clear();
         }
-
+        stateStore.storeCollectedMetrics(ctx, metricHistory);
         return collectedMetrics;
     }
 
@@ -154,6 +153,15 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         return conf.get(AutoScalerOptions.METRICS_WINDOW);
     }
 
+    private static Instant getWindowFullTime(
+            SortedMap<Instant, CollectedMetrics> metricsAfterStable,
+            Instant now,
+            Duration metricWindowSize) {
+        return metricsAfterStable.isEmpty()
+                ? now.plus(metricWindowSize)
+                : metricsAfterStable.firstKey().plus(metricWindowSize);
+    }
+
     @VisibleForTesting
     protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) {
         return Instant.ofEpochMilli(
@@ -265,9 +273,11 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                     ScalingMetrics.computeLoadMetrics(
                             jobVertexID, vertexFlinkMetrics, 
vertexScalingMetrics, conf);
 
+                    var metricHistory =
+                            histories.getOrDefault(jobKey, 
Collections.emptySortedMap());
                     double lagGrowthRate =
                             computeLagGrowthRate(
-                                    jobKey,
+                                    metricHistory,
                                     jobVertexID,
                                     
vertexScalingMetrics.get(ScalingMetric.LAG));
 
@@ -277,8 +287,13 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                             vertexScalingMetrics,
                             jobTopology,
                             lagGrowthRate,
-                            conf);
-
+                            conf,
+                            observedTprAvg(
+                                    jobVertexID,
+                                    metricHistory,
+                                    conf.get(
+                                            AutoScalerOptions
+                                                    
.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)));
                     vertexScalingMetrics
                             .entrySet()
                             .forEach(e -> 
e.setValue(ScalingMetrics.roundMetric(e.getValue())));
@@ -292,10 +307,21 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         return new CollectedMetrics(out, outputRatios);
     }
 
-    private double computeLagGrowthRate(KEY jobKey, JobVertexID jobVertexID, 
Double currentLag) {
-        var metricHistory = histories.get(jobKey);
+    private static Supplier<Double> observedTprAvg(
+            JobVertexID jobVertexID,
+            SortedMap<Instant, CollectedMetrics> metricHistory,
+            int minObservations) {
+        return () ->
+                ScalingMetricEvaluator.getAverage(
+                        ScalingMetric.OBSERVED_TPR, jobVertexID, 
metricHistory, minObservations);
+    }
+
+    private double computeLagGrowthRate(
+            SortedMap<Instant, CollectedMetrics> metricHistory,
+            JobVertexID jobVertexID,
+            Double currentLag) {
 
-        if (metricHistory == null || metricHistory.isEmpty()) {
+        if (metricHistory.isEmpty()) {
             return Double.NaN;
         }
 
@@ -332,30 +358,39 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                                     && previousMetricNames
                                             .keySet()
                                             
.equals(topology.getParallelisms().keySet())) {
-                                // We have already gathered the metric names 
for this topology
-                                return previousMetricNames;
+                                var newMetricNames = new 
HashMap<>(previousMetricNames);
+                                var sourceMetricNames =
+                                        queryFilteredMetricNames(
+                                                ctx,
+                                                topology,
+                                                
vertices.stream().filter(topology::isSource));
+                                newMetricNames.putAll(sourceMetricNames);
+                                return newMetricNames;
                             }
 
-                            try (var restClient = ctx.getRestClusterClient()) {
-                                return vertices.stream()
-                                        .filter(v -> 
!topology.getFinishedVertices().contains(v))
-                                        .collect(
-                                                Collectors.toMap(
-                                                        v -> v,
-                                                        v ->
-                                                                
getFilteredVertexMetricNames(
-                                                                        
restClient,
-                                                                        
ctx.getJobID(),
-                                                                        v,
-                                                                        
topology)));
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
+                            // Query all metric names
+                            return queryFilteredMetricNames(ctx, topology, 
vertices.stream());
                         });
         names.keySet().removeAll(topology.getFinishedVertices());
         return names;
     }
 
+    private Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
+            Context ctx, JobTopology topology, Stream<JobVertexID> 
vertexStream) {
+        try (var restClient = ctx.getRestClusterClient()) {
+            return vertexStream
+                    .filter(v -> !topology.getFinishedVertices().contains(v))
+                    .collect(
+                            Collectors.toMap(
+                                    v -> v,
+                                    v ->
+                                            getFilteredVertexMetricNames(
+                                                    restClient, 
ctx.getJobID(), v, topology)));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     /**
      * Query and filter metric names for a given job vertex.
      *
@@ -378,6 +413,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         requiredMetrics.add(FlinkMetric.BUSY_TIME_PER_SEC);
 
         if (topology.isSource(jobVertexID)) {
+            requiredMetrics.add(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
             
requiredMetrics.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
             // Pending records metric won't be available for some sources.
             // The Kafka source, for instance, lazily initializes this metric 
on receiving
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index 1aec2cf2..a97ac6f1 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -48,6 +48,7 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSI
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LAG;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.LOAD;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
+import static org.apache.flink.autoscaler.metrics.ScalingMetric.OBSERVED_TPR;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
@@ -135,9 +136,7 @@ public class ScalingMetricEvaluator {
 
         evaluatedMetrics.put(
                 TRUE_PROCESSING_RATE,
-                new EvaluatedScalingMetric(
-                        latestVertexMetrics.get(TRUE_PROCESSING_RATE),
-                        getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory)));
+                evaluateTpr(metricsHistory, vertex, latestVertexMetrics, 
conf));
 
         evaluatedMetrics.put(
                 LOAD,
@@ -154,6 +153,57 @@ public class ScalingMetricEvaluator {
         return evaluatedMetrics;
     }
 
+    private static EvaluatedScalingMetric evaluateTpr(
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
+            JobVertexID vertex,
+            Map<ScalingMetric, Double> latestVertexMetrics,
+            Configuration conf) {
+
+        var busyTimeTprAvg = getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory);
+        var observedTprAvg =
+                getAverage(
+                        OBSERVED_TPR,
+                        vertex,
+                        metricsHistory,
+                        
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS));
+
+        var tprMetric = selectTprMetric(vertex, conf, busyTimeTprAvg, 
observedTprAvg);
+        return new EvaluatedScalingMetric(
+                latestVertexMetrics.getOrDefault(tprMetric, Double.NaN),
+                tprMetric == OBSERVED_TPR ? observedTprAvg : busyTimeTprAvg);
+    }
+
+    private static ScalingMetric selectTprMetric(
+            JobVertexID jobVertexID,
+            Configuration conf,
+            double busyTimeTprAvg,
+            double observedTprAvg) {
+
+        if (Double.isInfinite(busyTimeTprAvg) || Double.isNaN(busyTimeTprAvg)) 
{
+            return OBSERVED_TPR;
+        }
+
+        if (Double.isNaN(observedTprAvg)) {
+            return TRUE_PROCESSING_RATE;
+        }
+
+        double switchThreshold =
+                
conf.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD);
+        // If we could measure the observed tpr we decide whether to switch to 
using it
+        // instead of busy time based on the error / difference between the two
+        if (busyTimeTprAvg > observedTprAvg * (1 + switchThreshold)) {
+            LOG.debug(
+                    "Using observed tpr {} for {} as busy time based seems too 
large ({})",
+                    observedTprAvg,
+                    jobVertexID,
+                    busyTimeTprAvg);
+            return OBSERVED_TPR;
+        } else {
+            LOG.debug("Using busy time based tpr {} for {}.", busyTimeTprAvg, 
jobVertexID);
+            return TRUE_PROCESSING_RATE;
+        }
+    }
+
     @VisibleForTesting
     protected static void computeProcessingRateThresholds(
             Map<ScalingMetric, EvaluatedScalingMetric> metrics,
@@ -241,25 +291,40 @@ public class ScalingMetricEvaluator {
         }
     }
 
-    private static double getAverage(
+    public static double getAverage(
             ScalingMetric metric,
             JobVertexID jobVertexId,
             SortedMap<Instant, CollectedMetrics> metricsHistory) {
-        double[] metricValues =
-                metricsHistory.values().stream()
-                        .map(m -> m.getVertexMetrics().get(jobVertexId))
-                        .filter(m -> m.containsKey(metric))
-                        .mapToDouble(m -> m.get(metric))
-                        .filter(d -> !Double.isNaN(d))
-                        .toArray();
-        for (double metricValue : metricValues) {
-            if (Double.isInfinite(metricValue)) {
-                // As long as infinite values are present, we can't properly 
average. We need to
-                // wait until they are evicted.
-                return metricValue;
+        return getAverage(metric, jobVertexId, metricsHistory, 1);
+    }
+
+    public static double getAverage(
+            ScalingMetric metric,
+            JobVertexID jobVertexId,
+            SortedMap<Instant, CollectedMetrics> metricsHistory,
+            int minElements) {
+
+        double sum = 0;
+        int n = 0;
+        boolean anyInfinite = false;
+        for (var collectedMetrics : metricsHistory.values()) {
+            var metrics = collectedMetrics.getVertexMetrics().get(jobVertexId);
+            double num = metrics.getOrDefault(metric, Double.NaN);
+            if (Double.isNaN(num)) {
+                continue;
             }
+            if (Double.isInfinite(num)) {
+                anyInfinite = true;
+                continue;
+            }
+
+            sum += num;
+            n++;
         }
-        return StatUtils.mean(metricValues);
+        if (n == 0) {
+            return anyInfinite ? Double.POSITIVE_INFINITY : Double.NaN;
+        }
+        return n < minElements ? Double.NaN : sum / n;
     }
 
     private static double getAverageOutputRatio(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index 6c74b5d2..626921bc 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -151,6 +151,36 @@ public class AutoScalerOptions {
                     .withDescription(
                             "Lag threshold which will prevent unnecessary 
scalings while removing the pending messages responsible for the lag.");
 
+    public static final ConfigOption<Duration> 
OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD =
+            autoScalerConfig("observed-true-processing-rate.lag-threshold")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDeprecatedKeys(
+                            deprecatedOperatorConfigKey(
+                                    
"observed-true-processing-rate.lag-threshold"))
+                    .withDescription(
+                            "Lag threshold for enabling observed true 
processing rate measurements.");
+
+    public static final ConfigOption<Double> 
OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD =
+            autoScalerConfig("observed-true-processing-rate.switch-threshold")
+                    .doubleType()
+                    .defaultValue(0.15)
+                    .withDeprecatedKeys(
+                            deprecatedOperatorConfigKey(
+                                    
"observed-true-processing-rate.switch-threshold"))
+                    .withDescription(
+                            "Percentage threshold for switching to observed 
from busy time based true processing rate if the measurement is off by at least 
the configured fraction. For example 0.15 means we switch to observed if the 
busy time based computation is at least 15% higher during catchup.");
+
+    public static final ConfigOption<Integer> 
OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS =
+            autoScalerConfig("observed-true-processing-rate.min-observations")
+                    .intType()
+                    .defaultValue(2)
+                    .withDeprecatedKeys(
+                            deprecatedOperatorConfigKey(
+                                    
"observed-true-processing-rate.min-observations"))
+                    .withDescription(
+                            "Minimum nr of observations used when estimating / 
switching to observed true processing rate.");
+
     public static final ConfigOption<Boolean> 
SCALING_EFFECTIVENESS_DETECTION_ENABLED =
             autoScalerConfig("scaling.effectiveness.detection.enabled")
                     .booleanType()
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index 9fafc686..a5a0edfe 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -25,7 +25,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 
 /**
- * Handler all loggable events during scaling.
+ * Handler for autoscaler events.
  *
  * @param <KEY> The job key.
  * @param <Context> Instance of JobAutoScalerContext.
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 7f28d947..398f2ee3 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -38,7 +38,8 @@ public enum FlinkMetric {
             s -> s.startsWith("Source__") && 
s.endsWith(".numRecordsOutPerSecond")),
     SOURCE_TASK_NUM_RECORDS_IN_PER_SEC(
             s -> s.startsWith("Source__") && 
s.endsWith(".numRecordsInPerSecond")),
-    PENDING_RECORDS(s -> s.endsWith(".pendingRecords"));
+    PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
+    BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond"));
 
     public static final Map<FlinkMetric, AggregatedMetric> FINISHED_METRICS =
             Map.of(
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index b6a3e17d..2e7e52ee 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -29,6 +29,9 @@ public enum ScalingMetric {
     /** Processing rate at full capacity (records/sec). */
     TRUE_PROCESSING_RATE(true),
 
+    /** Observed true processing rate for sources. */
+    OBSERVED_TPR(true),
+
     /** Current processing rate. */
     CURRENT_PROCESSING_RATE(true),
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index afaa21ae..ed82277b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -30,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
 
 /** Utilities for computing scaling metrics based on Flink metrics. */
 public class ScalingMetrics {
@@ -52,11 +54,11 @@ public class ScalingMetrics {
             Map<ScalingMetric, Double> scalingMetrics,
             JobTopology topology,
             double lagGrowthRate,
-            Configuration conf) {
+            Configuration conf,
+            Supplier<Double> observedTprAvg) {
 
         var isSource = topology.isSource(jobVertexID);
 
-        double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
         double numRecordsInPerSecond =
                 getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
 
@@ -68,7 +70,15 @@ public class ScalingMetrics {
         }
 
         if (!Double.isNaN(numRecordsInPerSecond)) {
-            double trueProcessingRate = computeTrueRate(numRecordsInPerSecond, 
busyTimeMsPerSecond);
+            double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
+            double trueProcessingRate =
+                    computeTprFromBusyTime(conf, numRecordsInPerSecond, 
busyTimeMsPerSecond);
+            if (isSource) {
+                var observedTprOpt =
+                        getObservedTpr(flinkMetrics, scalingMetrics, 
numRecordsInPerSecond, conf)
+                                .orElseGet(observedTprAvg);
+                scalingMetrics.put(ScalingMetric.OBSERVED_TPR, observedTprOpt);
+            }
             scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, 
trueProcessingRate);
             scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
numRecordsInPerSecond);
         } else {
@@ -78,6 +88,45 @@ public class ScalingMetrics {
         }
     }
 
+    private static Optional<Double> getObservedTpr(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Map<ScalingMetric, Double> scalingMetrics,
+            double numRecordsInPerSecond,
+            Configuration conf) {
+
+        // If there are no incoming records we return infinity to allow scale 
down
+        if (numRecordsInPerSecond == 0) {
+            return Optional.of(Double.POSITIVE_INFINITY);
+        }
+
+        // We only measure observed tpr when we are catching up, that is when 
the lag is beyond the
+        // configured observe threshold
+        boolean catchingUp =
+                scalingMetrics.getOrDefault(ScalingMetric.LAG, 0.)
+                        >= 
conf.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)
+                                        .toSeconds()
+                                * numRecordsInPerSecond;
+        if (!catchingUp) {
+            return Optional.empty();
+        }
+
+        double observedTpr =
+                computeObservedTprWithBackpressure(
+                        numRecordsInPerSecond,
+                        
flinkMetrics.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg());
+
+        return Double.isNaN(observedTpr) ? Optional.empty() : 
Optional.of(observedTpr);
+    }
+
+    public static double computeObservedTprWithBackpressure(
+            double numRecordsInPerSecond, double backpressureMsPerSeconds) {
+        if (backpressureMsPerSeconds >= 1000) {
+            return Double.NaN;
+        }
+        double nonBackpressuredRate = (1 - (backpressureMsPerSeconds / 1000));
+        return numRecordsInPerSecond / nonBackpressuredRate;
+    }
+
     public static Map<Edge, Double> computeOutputRatios(
             Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> flinkMetrics,
             JobTopology topology) {
@@ -136,10 +185,9 @@ public class ScalingMetrics {
                         "No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
                         jobVertexId);
             }
-            // Pretend that the load is balanced because we don't know any 
better
-            busyTimeMsPerSecond = 
conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
+            return Double.NaN;
         }
-        return busyTimeMsPerSecond;
+        return Math.max(0, busyTimeMsPerSecond);
     }
 
     private static double getNumRecordsInPerSecond(
@@ -159,7 +207,7 @@ public class ScalingMetrics {
             LOG.warn("Received null input rate for {}. Returning NaN.", 
jobVertexID);
             return Double.NaN;
         }
-        return numRecordsInPerSecond.getSum();
+        return Math.max(0, numRecordsInPerSecond.getSum());
     }
 
     private static double getNumRecordsOutPerSecond(
@@ -225,12 +273,17 @@ public class ScalingMetrics {
         return getNumRecordsOutPerSecond(fromMetrics, from);
     }
 
-    private static double computeTrueRate(double rate, double 
busyTimeMsPerSecond) {
-        if (rate <= 0 || busyTimeMsPerSecond <= 0) {
+    private static double computeTprFromBusyTime(
+            Configuration conf, double rate, double busyTimeMsPerSecond) {
+        if (rate == 0) {
             // Nothing is coming in, we assume infinite processing power
             // until we can sample the true processing rate (i.e. data flows).
             return Double.POSITIVE_INFINITY;
         }
+        // Pretend that the load is balanced because we don't know any better
+        if (Double.isNaN(busyTimeMsPerSecond)) {
+            busyTimeMsPerSecond = 
conf.get(AutoScalerOptions.TARGET_UTILIZATION) * 1000;
+        }
         return rate / (busyTimeMsPerSecond / 1000);
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
index 36184dad..b4895648 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/realizer/ScalingRealizer.java
@@ -23,7 +23,8 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import java.util.Map;
 
 /**
- * The Scaling Realizer is responsible for managing scaling actions.
+ * The Scaling Realizer is responsible for applying scaling actions, i.e. 
actually rescaling the
+ * jobs.
  *
  * @param <KEY> The job key.
  * @param <Context> Instance of JobAutoScalerContext.
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
index 4551ef54..b0255506 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/AutoScalerStateStore.java
@@ -46,14 +46,13 @@ public interface AutoScalerStateStore<KEY, Context extends 
JobAutoScalerContext<
 
     void removeScalingHistory(Context jobContext) throws Exception;
 
-    void storeEvaluatedMetrics(
-            Context jobContext, SortedMap<Instant, CollectedMetrics> 
evaluatedMetrics)
+    void storeCollectedMetrics(Context jobContext, SortedMap<Instant, 
CollectedMetrics> metrics)
             throws Exception;
 
-    Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context 
jobContext)
+    Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(Context 
jobContext)
             throws Exception;
 
-    void removeEvaluatedMetrics(Context jobContext) throws Exception;
+    void removeCollectedMetrics(Context jobContext) throws Exception;
 
     void storeParallelismOverrides(Context jobContext, Map<String, String> 
parallelismOverrides)
             throws Exception;
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
index 2685db91..1cb74d44 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/state/InMemoryAutoScalerStateStore.java
@@ -29,7 +29,7 @@ import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * The state store based on the Java Heap, the state will be discarded after 
process restarts.
+ * State store based on the Java Heap, the state will be discarded after 
process restarts.
  *
  * @param <KEY> The job key.
  * @param <Context> The job autoscaler context.
@@ -40,13 +40,13 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     private final Map<KEY, Map<JobVertexID, SortedMap<Instant, 
ScalingSummary>>>
             scalingHistoryStore;
 
-    private final Map<KEY, SortedMap<Instant, CollectedMetrics>> 
evaluatedMetricsStore;
+    private final Map<KEY, SortedMap<Instant, CollectedMetrics>> 
collectedMetricsStore;
 
     private final Map<KEY, Map<String, String>> parallelismOverridesStore;
 
     public InMemoryAutoScalerStateStore() {
         scalingHistoryStore = new ConcurrentHashMap<>();
-        evaluatedMetricsStore = new ConcurrentHashMap<>();
+        collectedMetricsStore = new ConcurrentHashMap<>();
         parallelismOverridesStore = new ConcurrentHashMap<>();
     }
 
@@ -69,19 +69,19 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     }
 
     @Override
-    public void storeEvaluatedMetrics(
-            Context jobContext, SortedMap<Instant, CollectedMetrics> 
evaluatedMetrics) {
-        evaluatedMetricsStore.put(jobContext.getJobKey(), evaluatedMetrics);
+    public void storeCollectedMetrics(
+            Context jobContext, SortedMap<Instant, CollectedMetrics> metrics) {
+        collectedMetricsStore.put(jobContext.getJobKey(), metrics);
     }
 
     @Override
-    public Optional<SortedMap<Instant, CollectedMetrics>> 
getEvaluatedMetrics(Context jobContext) {
-        return 
Optional.ofNullable(evaluatedMetricsStore.get(jobContext.getJobKey()));
+    public Optional<SortedMap<Instant, CollectedMetrics>> 
getCollectedMetrics(Context jobContext) {
+        return 
Optional.ofNullable(collectedMetricsStore.get(jobContext.getJobKey()));
     }
 
     @Override
-    public void removeEvaluatedMetrics(Context jobContext) {
-        evaluatedMetricsStore.remove(jobContext.getJobKey());
+    public void removeCollectedMetrics(Context jobContext) {
+        collectedMetricsStore.remove(jobContext.getJobKey());
     }
 
     @Override
@@ -108,7 +108,7 @@ public class InMemoryAutoScalerStateStore<KEY, Context 
extends JobAutoScalerCont
     @Override
     public void removeInfoFromCache(KEY jobKey) {
         scalingHistoryStore.remove(jobKey);
-        evaluatedMetricsStore.remove(jobKey);
+        collectedMetricsStore.remove(jobKey);
         parallelismOverridesStore.remove(jobKey);
     }
 }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 0128f3ca..3e5c6fa1 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -368,7 +368,7 @@ public class BacklogBasedScalingTest {
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
 
         autoscaler.scale(context);
-        assertFalse(stateStore.getEvaluatedMetrics(context).get().isEmpty());
+        assertFalse(stateStore.getCollectedMetrics(context).get().isEmpty());
     }
 
     @Test
@@ -392,13 +392,13 @@ public class BacklogBasedScalingTest {
         setClocksTo(now);
         metricsCollector.setJobUpdateTs(now);
         autoscaler.scale(context);
-        assertTrue(stateStore.getEvaluatedMetrics(context).isEmpty());
+        assertTrue(autoscaler.lastEvaluatedMetrics.isEmpty());
         assertTrue(eventCollector.events.isEmpty());
     }
 
     private void assertEvaluatedMetricsSize(int expectedSize) {
         Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
-                stateStore.getEvaluatedMetrics(context);
+                stateStore.getCollectedMetrics(context);
         assertThat(evaluatedMetricsOpt).isPresent();
         assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index c36d6e10..24cc9e2b 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
+import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -41,6 +42,7 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -121,7 +123,7 @@ public class MetricsCollectionAndEvaluationTest {
         clock = Clock.offset(clock, 
conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
         metricsCollector.setClock(clock);
         collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
-        assertEquals(1, collectedMetrics.getMetricHistory().size());
+        assertEquals(2, collectedMetrics.getMetricHistory().size());
         assertFalse(collectedMetrics.isFullyCollected());
 
         // We haven't collected a full window yet
@@ -129,7 +131,7 @@ public class MetricsCollectionAndEvaluationTest {
         clock = Clock.offset(clock, Duration.ofSeconds(1));
         metricsCollector.setClock(clock);
         collectedMetrics = metricsCollector.updateMetrics(context, stateStore);
-        assertEquals(2, collectedMetrics.getMetricHistory().size());
+        assertEquals(3, collectedMetrics.getMetricHistory().size());
         assertFalse(collectedMetrics.isFullyCollected());
 
         // Advance time to stabilization period + full window => metrics 
should be present
@@ -294,13 +296,13 @@ public class MetricsCollectionAndEvaluationTest {
         // This call will lead to metric collection but we haven't reached the 
window size yet
         // which will hold back metrics
         metricsHistory = metricsCollector.updateMetrics(context, stateStore);
-        assertEquals(1, metricsHistory.getMetricHistory().size());
+        assertEquals(3, metricsHistory.getMetricHistory().size());
         assertFalse(metricsHistory.isFullyCollected());
 
         // Collect more values in window
         metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
         metricsHistory = metricsCollector.updateMetrics(context, stateStore);
-        assertEquals(2, metricsHistory.getMetricHistory().size());
+        assertEquals(4, metricsHistory.getMetricHistory().size());
         assertFalse(metricsHistory.isFullyCollected());
 
         // Window size reached
@@ -418,14 +420,177 @@ public class MetricsCollectionAndEvaluationTest {
                         0.,
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         Double.POSITIVE_INFINITY,
+                        ScalingMetric.OBSERVED_TPR,
+                        Double.POSITIVE_INFINITY,
                         ScalingMetric.LOAD,
                         0.),
                 finishedMetrics);
     }
 
+    @Test
+    public void testObservedTprCollection() throws Exception {
+        var source = new JobVertexID();
+        var topology = new JobTopology(new VertexInfo(source, Set.of(), 10, 
720));
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> metrics =
+                Map.of(
+                        source,
+                        new HashMap<>(
+                                Map.of(
+                                        FlinkMetric.PENDING_RECORDS,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
Double.NaN, 1000000.),
+                                        FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
600., Double.NaN),
+                                        FlinkMetric.BUSY_TIME_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, 200., 
Double.NaN, Double.NaN),
+                                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                                        
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
+
+        metricsCollector = new TestingMetricsCollector(topology);
+        metricsCollector.setJobUpdateTs(startTime);
+        metricsCollector.setCurrentMetrics(metrics);
+
+        
context.getConfiguration().set(AutoScalerOptions.STABILIZATION_INTERVAL, 
Duration.ZERO);
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(100), 
ZoneId.systemDefault()));
+        var collectedMetrics =
+                metricsCollector
+                        .updateMetrics(context, stateStore)
+                        .getMetricHistory()
+                        .get(Instant.ofEpochMilli(100))
+                        .getVertexMetrics()
+                        .get(source);
+
+        // Make sure both busy time and observed tpr is collected
+        assertEquals(2500., 
collectedMetrics.get(ScalingMetric.TRUE_PROCESSING_RATE));
+        assertEquals(500. / 0.4, 
collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+
+        // Make sure that average observed tpr is picked up only if 2 valid 
observations
+        // We set no lag so observed cannot be computed and expect nan
+        metrics.get(source)
+                .put(
+                        FlinkMetric.PENDING_RECORDS,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 0.));
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(200), 
ZoneId.systemDefault()));
+        collectedMetrics =
+                metricsCollector
+                        .updateMetrics(context, stateStore)
+                        .getMetricHistory()
+                        .get(Instant.ofEpochMilli(200))
+                        .getVertexMetrics()
+                        .get(source);
+
+        // Make sure observed busy time is empty but still using observed
+        assertEquals(Double.NaN, 
collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+
+        // Add another valid observed computation
+        metrics.get(source)
+                .put(
+                        FlinkMetric.PENDING_RECORDS,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 100000.));
+        metrics.get(source)
+                .put(
+                        FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 500., 
Double.NaN));
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(300), 
ZoneId.systemDefault()));
+        collectedMetrics =
+                metricsCollector
+                        .updateMetrics(context, stateStore)
+                        .getMetricHistory()
+                        .get(Instant.ofEpochMilli(300))
+                        .getVertexMetrics()
+                        .get(source);
+
+        assertEquals(500. / 0.5, 
collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+        // Make sure avg is picked correctly another valid observed computation
+        metrics.get(source)
+                .put(
+                        FlinkMetric.PENDING_RECORDS,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 0.));
+
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(400), 
ZoneId.systemDefault()));
+        collectedMetrics =
+                metricsCollector
+                        .updateMetrics(context, stateStore)
+                        .getMetricHistory()
+                        .get(Instant.ofEpochMilli(400))
+                        .getVertexMetrics()
+                        .get(source);
+
+        assertEquals(
+                ((500. / 0.5) + (500. / 0.4)) / 2,
+                collectedMetrics.get(ScalingMetric.OBSERVED_TPR));
+    }
+
+    @Test
+    public void testMetricCollectionDuringStabilization() throws Exception {
+        var source = new JobVertexID();
+        var topology = new JobTopology(new VertexInfo(source, Set.of(), 10, 
720));
+        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> metrics =
+                Map.of(
+                        source,
+                        new HashMap<>(
+                                Map.of(
+                                        FlinkMetric.PENDING_RECORDS,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
Double.NaN, 1000000.),
+                                        FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
600., Double.NaN),
+                                        FlinkMetric.BUSY_TIME_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, 200., 
Double.NaN, Double.NaN),
+                                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                                        
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+                                        new AggregatedMetric(
+                                                "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
+
+        metricsCollector = new TestingMetricsCollector(topology);
+        metricsCollector.setJobUpdateTs(startTime);
+        metricsCollector.setCurrentMetrics(metrics);
+
+        context.getConfiguration()
+                .set(AutoScalerOptions.STABILIZATION_INTERVAL, 
Duration.ofMillis(100));
+        context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW, 
Duration.ofMillis(100));
+
+        // Within stabilization period we simply collect metrics but do not 
return them
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50), 
ZoneId.systemDefault()));
+        assertTrue(
+                metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().isEmpty());
+        assertEquals(1, stateStore.getCollectedMetrics(context).get().size());
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60), 
ZoneId.systemDefault()));
+        assertTrue(
+                metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().isEmpty());
+        assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+
+        // Until window is full (time=200) we keep returning stabilizing 
metrics
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), 
ZoneId.systemDefault()));
+        assertEquals(
+                3, metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().size());
+        assertEquals(3, stateStore.getCollectedMetrics(context).get().size());
+
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(180), 
ZoneId.systemDefault()));
+        assertEquals(
+                4, metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().size());
+        assertEquals(4, stateStore.getCollectedMetrics(context).get().size());
+
+        // Once we reach full time we trim the stabilization metrics
+        metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(260), 
ZoneId.systemDefault()));
+        assertEquals(
+                2, metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().size());
+        assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+    }
+
     @Test
     public void testScaleDownWithZeroProcessingRate() throws Exception {
-        var topology = new JobTopology(new VertexInfo(source1, Set.of(), 10, 
720));
+        var topology = new JobTopology(new VertexInfo(source1, Set.of(), 2, 
720));
 
         metricsCollector = new TestingMetricsCollector<>(topology);
         metricsCollector.setJobUpdateTs(startTime);
@@ -448,7 +613,7 @@ public class MetricsCollectionAndEvaluationTest {
         assertEquals(0, 
evaluation.get(source1).get(ScalingMetric.TARGET_DATA_RATE).getCurrent());
         assertEquals(
                 Double.POSITIVE_INFINITY,
-                
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
+                
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage());
         assertEquals(
                 0.,
                 
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
@@ -459,6 +624,23 @@ public class MetricsCollectionAndEvaluationTest {
         scalingExecutor.scaleResource(context, evaluation);
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
+
+        // Make sure if there are measurements with non-infinite TPR, we don't 
evaluate infinite
+        var lastCollected = 
collectedMetrics.getMetricHistory().values().iterator().next();
+        var newMetrics = new HashMap<>(lastCollected.getVertexMetrics());
+        newMetrics.get(source1).put(ScalingMetric.TRUE_PROCESSING_RATE, 3.);
+        newMetrics.get(source1).put(ScalingMetric.OBSERVED_TPR, 3.);
+        newMetrics.get(source1).put(ScalingMetric.SOURCE_DATA_RATE, 2.);
+
+        collectedMetrics
+                .getMetricHistory()
+                .put(
+                        Instant.ofEpochSecond(1234),
+                        new CollectedMetrics(newMetrics, 
lastCollected.getOutputRatios()));
+
+        evaluation = evaluator.evaluate(context.getConfiguration(), 
collectedMetrics);
+        assertEquals(
+                3., 
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage());
     }
 
     private CollectedMetricHistory collectMetrics() throws Exception {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index da41c015..c4b714ef 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -205,7 +205,7 @@ public class RecommendedParallelismTest {
 
         // after restart while the job is not running the evaluated metrics 
are gone
         autoscaler.scale(context);
-        assertEquals(3, stateStore.getEvaluatedMetrics(context).get().size());
+        assertEquals(3, stateStore.getCollectedMetrics(context).get().size());
         assertNull(autoscaler.lastEvaluatedMetrics.get(context.getJobKey()));
         scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.get(source));
@@ -230,7 +230,7 @@ public class RecommendedParallelismTest {
 
     private void assertEvaluatedMetricsSize(int expectedSize) {
         Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
-                stateStore.getEvaluatedMetrics(context);
+                stateStore.getCollectedMetrics(context);
         assertThat(evaluatedMetricsOpt).isPresent();
         assertThat(evaluatedMetricsOpt.get()).hasSize(expectedSize);
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
index 128c4094..a74ac295 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java
@@ -46,17 +46,16 @@ public class RestApiMetricsCollectorTest {
 
     @Test
     public void testAggregateMultiplePendingRecordsMetricsPerSource() throws 
Exception {
-        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
-                new RestApiMetricsCollector<>();
+        var collector = new RestApiMetricsCollector<JobID, 
JobAutoScalerContext<JobID>>();
 
         JobVertexID jobVertexID = new JobVertexID();
-        Map<String, FlinkMetric> flinkMetrics =
+        var flinkMetrics =
                 Map.of(
                         "a.pendingRecords", FlinkMetric.PENDING_RECORDS,
                         "b.pendingRecords", FlinkMetric.PENDING_RECORDS);
-        Map<JobVertexID, Map<String, FlinkMetric>> metrics = 
Map.of(jobVertexID, flinkMetrics);
+        var metrics = Map.of(jobVertexID, flinkMetrics);
 
-        List<AggregatedMetric> aggregatedMetricsResponse =
+        var aggregatedMetricsResponse =
                 List.of(
                         new AggregatedMetric(
                                 "a.pendingRecords", Double.NaN, Double.NaN, 
Double.NaN, 100.),
@@ -65,8 +64,8 @@ public class RestApiMetricsCollectorTest {
                         new AggregatedMetric(
                                 "c.unrelated", Double.NaN, Double.NaN, 
Double.NaN, 100.));
 
-        Configuration conf = new Configuration();
-        RestClusterClient<String> restClusterClient =
+        var conf = new Configuration();
+        var restClusterClient =
                 new RestClusterClient<>(
                         conf,
                         "test-cluster",
@@ -91,7 +90,7 @@ public class RestApiMetricsCollectorTest {
                 };
 
         JobID jobID = new JobID();
-        JobAutoScalerContext<JobID> context =
+        var context =
                 new JobAutoScalerContext<>(
                         jobID,
                         jobID,
@@ -100,8 +99,7 @@ public class RestApiMetricsCollectorTest {
                         new UnregisteredMetricsGroup(),
                         () -> restClusterClient);
 
-        Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> jobVertexIDMapMap 
=
-                collector.queryAllAggregatedMetrics(context, metrics);
+        var jobVertexIDMapMap = collector.queryAllAggregatedMetrics(context, 
metrics);
 
         Assertions.assertEquals(1, jobVertexIDMapMap.size());
         Map<FlinkMetric, AggregatedMetric> vertexMetrics = 
jobVertexIDMapMap.get(jobVertexID);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index 57fe3de9..724aa218 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -36,11 +36,10 @@ import org.junit.jupiter.api.Test;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -90,30 +89,66 @@ public class ScalingMetricCollectorTest {
 
     @Test
     public void testQueryNamesOnTopologyChange() {
-        var metricNameQueryCounter = new AtomicInteger(0);
-        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> collector =
-                new RestApiMetricsCollector<>() {
+        var metricNameQueryCounter = new HashMap<JobVertexID, Integer>();
+        var collector =
+                new RestApiMetricsCollector<JobID, 
JobAutoScalerContext<JobID>>() {
                     @Override
                     protected Map<String, FlinkMetric> 
getFilteredVertexMetricNames(
                             RestClusterClient<?> rc, JobID id, JobVertexID 
jvi, JobTopology t) {
-                        metricNameQueryCounter.incrementAndGet();
+                        metricNameQueryCounter.compute(jvi, (j, c) -> c + 1);
                         return Map.of();
                     }
                 };
 
-        var t1 = new JobTopology(new VertexInfo(new JobVertexID(), 
Collections.emptySet(), 1, 1));
-        var t2 = new JobTopology(new VertexInfo(new JobVertexID(), 
Collections.emptySet(), 1, 1));
+        var source = new JobVertexID();
+        var source2 = new JobVertexID();
+        var sink = new JobVertexID();
+        metricNameQueryCounter.put(source, 0);
+        metricNameQueryCounter.put(source2, 0);
+        metricNameQueryCounter.put(sink, 0);
+
+        var t1 =
+                new JobTopology(
+                        new VertexInfo(source, Set.of(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var t2 =
+                new JobTopology(
+                        new VertexInfo(source2, Set.of(), 1, 1),
+                        new VertexInfo(sink, Set.of(source2), 1, 1));
 
         collector.queryFilteredMetricNames(context, t1);
-        assertEquals(1, metricNameQueryCounter.get());
+        assertEquals(1, metricNameQueryCounter.get(source));
+        assertEquals(0, metricNameQueryCounter.get(source2));
+        assertEquals(1, metricNameQueryCounter.get(sink));
+
         collector.queryFilteredMetricNames(context, t1);
         collector.queryFilteredMetricNames(context, t1);
-        assertEquals(1, metricNameQueryCounter.get());
+        // Make sure source metrics are refreshed
+        assertEquals(3, metricNameQueryCounter.get(source));
+        assertEquals(0, metricNameQueryCounter.get(source2));
+        assertEquals(1, metricNameQueryCounter.get(sink));
+
+        // Topology change
         collector.queryFilteredMetricNames(context, t2);
-        assertEquals(2, metricNameQueryCounter.get());
+        assertEquals(3, metricNameQueryCounter.get(source));
+        assertEquals(1, metricNameQueryCounter.get(source2));
+        assertEquals(2, metricNameQueryCounter.get(sink));
+
         collector.queryFilteredMetricNames(context, t2);
+        assertEquals(3, metricNameQueryCounter.get(source));
+        assertEquals(2, metricNameQueryCounter.get(source2));
+        assertEquals(2, metricNameQueryCounter.get(sink));
+
+        // Mark source finished, should not be queried again
+        t2 =
+                new JobTopology(
+                        new VertexInfo(source2, Set.of(), 1, 1, true),
+                        new VertexInfo(sink, Set.of(source2), 1, 1));
         collector.queryFilteredMetricNames(context, t2);
-        assertEquals(2, metricNameQueryCounter.get());
+        assertEquals(3, metricNameQueryCounter.get(source));
+        assertEquals(2, metricNameQueryCounter.get(source2));
+        assertEquals(2, metricNameQueryCounter.get(sink));
     }
 
     @Test
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 13b19445..ce7c5cb5 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -316,6 +316,125 @@ public class ScalingMetricEvaluatorTest {
         assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
     }
 
+    @Test
+    public void testObservedTprEvaluation() {
+        var source = new JobVertexID();
+        var conf = new Configuration();
+
+        var topology = new JobTopology(new VertexInfo(source, 
Collections.emptySet(), 1, 1));
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
+        metricHistory.put(
+                Instant.ofEpochMilli(100),
+                new CollectedMetrics(
+                        Map.of(
+                                source,
+                                Map.of(
+                                        ScalingMetric.LAG,
+                                        0.,
+                                        ScalingMetric.TRUE_PROCESSING_RATE,
+                                        300.,
+                                        ScalingMetric.OBSERVED_TPR,
+                                        200.,
+                                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                                        100.,
+                                        ScalingMetric.SOURCE_DATA_RATE,
+                                        50.,
+                                        ScalingMetric.LOAD,
+                                        10.)),
+                        Map.of()));
+        metricHistory.put(
+                Instant.ofEpochMilli(200),
+                new CollectedMetrics(
+                        Map.of(
+                                source,
+                                Map.of(
+                                        ScalingMetric.LAG,
+                                        0.,
+                                        ScalingMetric.TRUE_PROCESSING_RATE,
+                                        400.,
+                                        ScalingMetric.OBSERVED_TPR,
+                                        400.,
+                                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                                        100.,
+                                        ScalingMetric.SOURCE_DATA_RATE,
+                                        50.,
+                                        ScalingMetric.LOAD,
+                                        10.)),
+                        Map.of()));
+
+        // Observed TPR average : 300
+        // Bust Time TPR average: 350
+
+        // Set diff threshold to 20% -> within threshold
+        
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, 0.2);
+
+        // Test that we used busy time based TPR
+        assertEquals(
+                new EvaluatedScalingMetric(400., 350.),
+                evaluator
+                        .evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory))
+                        .get(source)
+                        .get(ScalingMetric.TRUE_PROCESSING_RATE));
+
+        // Set diff threshold to 10% -> outside threshold
+        
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, 0.1);
+
+        // Test that we used the observed TPR
+        assertEquals(
+                new EvaluatedScalingMetric(400, 300.),
+                evaluator
+                        .evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory))
+                        .get(source)
+                        .get(ScalingMetric.TRUE_PROCESSING_RATE));
+
+        // Test that observed tpr min observations are respected. If less, use 
busy time
+        
conf.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS, 3);
+        assertEquals(
+                new EvaluatedScalingMetric(400., 350.),
+                evaluator
+                        .evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory))
+                        .get(source)
+                        .get(ScalingMetric.TRUE_PROCESSING_RATE));
+    }
+
+    @Test
+    public void testMissingObservedTpr() {
+        var source = new JobVertexID();
+        var conf = new Configuration();
+
+        var topology = new JobTopology(new VertexInfo(source, 
Collections.emptySet(), 1, 1));
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, CollectedMetrics>();
+        metricHistory.put(
+                Instant.ofEpochMilli(100),
+                new CollectedMetrics(
+                        Map.of(
+                                source,
+                                Map.of(
+                                        ScalingMetric.LAG,
+                                        0.,
+                                        ScalingMetric.TRUE_PROCESSING_RATE,
+                                        300.,
+                                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                                        100.,
+                                        ScalingMetric.SOURCE_DATA_RATE,
+                                        50.,
+                                        ScalingMetric.LOAD,
+                                        10.)),
+                        Map.of()));
+
+        // Test that we used busy time based TPR
+        assertEquals(
+                new EvaluatedScalingMetric(300., 300.),
+                evaluator
+                        .evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory))
+                        .get(source)
+                        .get(ScalingMetric.TRUE_PROCESSING_RATE));
+    }
+
     private Tuple2<Double, Double> getThresholds(
             double inputTargetRate, double catchUpRate, Configuration conf) {
         return getThresholds(inputTargetRate, catchUpRate, conf, false);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index 85a72e5a..697fc926 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -38,6 +38,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Tests for scaling metrics computation logic. */
 public class ScalingMetricsTest {
 
+    private static final double PREV_TPR = 123;
+    private static final JobVertexID SOURCE = new JobVertexID();
+
     @Test
     public void testProcessingAndOutputMetrics() {
         var source = new JobVertexID();
@@ -55,7 +58,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
+                        new AggregatedMetric("", Double.NaN, 900., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                         aggSum(1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -63,12 +66,15 @@ public class ScalingMetricsTest {
                 scalingMetrics,
                 topology,
                 15.,
-                new Configuration());
+                new Configuration(),
+                () -> PREV_TPR);
 
         assertEquals(
                 Map.of(
                         ScalingMetric.TRUE_PROCESSING_RATE,
-                        10000.,
+                        1000. / 0.9,
+                        ScalingMetric.OBSERVED_TPR,
+                        PREV_TPR,
                         ScalingMetric.SOURCE_DATA_RATE,
                         1015.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -89,12 +95,15 @@ public class ScalingMetricsTest {
                 scalingMetrics,
                 topology,
                 -50.,
-                new Configuration());
+                new Configuration(),
+                () -> PREV_TPR);
 
         assertEquals(
                 Map.of(
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         10000.,
+                        ScalingMetric.OBSERVED_TPR,
+                        PREV_TPR,
                         ScalingMetric.SOURCE_DATA_RATE,
                         950.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
@@ -114,7 +123,8 @@ public class ScalingMetricsTest {
                 scalingMetrics,
                 topology,
                 0.,
-                new Configuration());
+                new Configuration(),
+                () -> 0.);
 
         assertEquals(
                 Map.of(
@@ -140,7 +150,8 @@ public class ScalingMetricsTest {
                 scalingMetrics,
                 topology,
                 0.,
-                conf);
+                conf,
+                () -> 0.);
 
         assertEquals(
                 Map.of(
@@ -179,7 +190,8 @@ public class ScalingMetricsTest {
                 scalingMetrics,
                 topology,
                 0.,
-                conf);
+                conf,
+                () -> PREV_TPR);
 
         // Make sure vertex won't be scaled
         
assertTrue(conf.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS).contains(source.toHexString()));
@@ -239,8 +251,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // When not busy at all, we have infinite processing 
power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.SOURCE_DATA_RATE,
-                        dataRate,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
                         10.),
                 scalingMetrics);
@@ -256,8 +266,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // When no records are coming in, we assume infinite 
processing power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.SOURCE_DATA_RATE,
-                        0.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
                         0.),
                 scalingMetrics);
@@ -272,8 +280,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // When no records are coming in, we assume infinite 
processing power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.SOURCE_DATA_RATE,
-                        0.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
                         0.),
                 scalingMetrics);
@@ -287,8 +293,6 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_PROCESSING_RATE,
                         // Nothing is coming in, we must assume infinite 
processing power
                         Double.POSITIVE_INFINITY,
-                        ScalingMetric.SOURCE_DATA_RATE,
-                        0.,
                         ScalingMetric.CURRENT_PROCESSING_RATE,
                         0.),
                 scalingMetrics);
@@ -308,11 +312,11 @@ public class ScalingMetricsTest {
 
         Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
         ScalingMetrics.computeDataRateMetrics(
-                source,
+                op,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
                         new AggregatedMetric("", Double.NaN, busyness, 
Double.NaN, Double.NaN),
-                        FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                         new AggregatedMetric(
                                 "", Double.NaN, Double.NaN, Double.NaN, 
processingRate),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -320,7 +324,8 @@ public class ScalingMetricsTest {
                 scalingMetrics,
                 topology,
                 0.,
-                new Configuration());
+                new Configuration(),
+                () -> 0.);
 
         return scalingMetrics;
     }
@@ -407,6 +412,74 @@ public class ScalingMetricsTest {
                 ScalingMetrics.computeOutputRatios(allMetrics, topology));
     }
 
+    @Test
+    public void testComputeTprWithBackpressure() {
+        assertEquals(Double.NaN, 
ScalingMetrics.computeObservedTprWithBackpressure(100, 1000));
+        assertEquals(500, 
ScalingMetrics.computeObservedTprWithBackpressure(500., 0));
+        assertEquals(1000, 
ScalingMetrics.computeObservedTprWithBackpressure(250, 750));
+    }
+
+    @Test
+    public void computeObservedTpr() {
+        // Without lag we cannot compute observed tpr, we compare against old
+        assertEquals(PREV_TPR, computeObservedTpr(500, 1000, 500, 500));
+
+        assertEquals(PREV_TPR, computeObservedTpr(0, 1000, 500, 500));
+
+        // When there is enough lag, observed rate is computed. Switch to 
busyness because diff is
+        // within limit
+        assertEquals(900 / 0.9, computeObservedTpr(10000000, 900, 850, 100));
+
+        // Should stay with busyness after switching as diff is still small
+        assertEquals(900 / 0.91, computeObservedTpr(10000000, 900, 900, 90));
+
+        // Use observed when diff is large and switch to observed
+        assertEquals(1000 / 0.8, computeObservedTpr(10000000, 1000, 500, 200));
+        assertEquals(1000 / 0.81, computeObservedTpr(10000000, 1000, 500, 
190));
+
+        // When no incoming records observed TPR should be infinity
+        assertEquals(Double.POSITIVE_INFINITY, computeObservedTpr(500, 0, 100, 
100));
+    }
+
+    public static double computeObservedTpr(
+            double lag, double processingRate, double busyness, double 
backpressure) {
+        return computeObservedTpr(lag, processingRate, busyness, backpressure, 
new Configuration());
+    }
+
+    public static double computeObservedTpr(
+            double lag,
+            double processingRate,
+            double busyness,
+            double backpressure,
+            Configuration conf) {
+        var sink = new JobVertexID();
+        var topology =
+                new JobTopology(
+                        new VertexInfo(SOURCE, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(SOURCE), 1, 1));
+
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        scalingMetrics.put(ScalingMetric.LAG, lag);
+        ScalingMetrics.computeDataRateMetrics(
+                SOURCE,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, busyness, 
Double.NaN, Double.NaN),
+                        FlinkMetric.BACKPRESSURE_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
backpressure, Double.NaN),
+                        FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric(
+                                "", Double.NaN, Double.NaN, Double.NaN, 
processingRate),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        aggSum(0)),
+                scalingMetrics,
+                topology,
+                0.,
+                conf,
+                () -> PREV_TPR);
+        return scalingMetrics.get(ScalingMetric.OBSERVED_TPR);
+    }
+
     private static AggregatedMetric aggSum(double sum) {
         return new AggregatedMetric("", Double.NaN, Double.NaN, Double.NaN, 
sum);
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
index f9816533..97e70267 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFactory.java
@@ -33,10 +33,8 @@ public class AutoscalerFactory {
     public static JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> 
create(
             KubernetesClient client, EventRecorder eventRecorder) {
 
-        KubernetesAutoScalerStateStore stateStore =
-                new KubernetesAutoScalerStateStore(new ConfigMapStore(client));
-        KubernetesAutoScalerEventHandler eventHandler =
-                new KubernetesAutoScalerEventHandler(eventRecorder);
+        var stateStore = new KubernetesAutoScalerStateStore(new 
ConfigMapStore(client));
+        var eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder);
 
         return new JobAutoScalerImpl<>(
                 new RestApiMetricsCollector<>(),
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
index 2a60d4af..5ded9719 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ConfigMapStore.java
@@ -44,12 +44,11 @@ public class ConfigMapStore {
 
     private final KubernetesClient kubernetesClient;
 
-    // The cache for each resourceId may be in three situations:
-    // 1. The resourceId isn't exist : ConfigMap isn't loaded from kubernetes, 
or it's removed.
-    // 2. The resourceId is exist, and value is the Optional.empty() : We have 
loaded the ConfigMap
-    // from kubernetes, but the ConfigMap isn't created at kubernetes side.
-    // 3. The resourceId is exist, and the Optional isn't empty : We have 
loaded the ConfigMap from
-    // kubernetes, it may be not same with kubernetes side due to it's not 
flushed after updating.
+    // The cache for each resourceId may be in three states:
+    // 1. The resourceId doesn't exist : ConfigMap isn't loaded from 
kubernetes, or it's deleted
+    // 2  Exists, Optional.empty() : The ConfigMap doesn't exist in Kubernetes
+    // 3. Exists, Not Empty : We have loaded the ConfigMap from kubernetes, it 
may not be the same
+    // if not flushed already
     private final ConcurrentHashMap<ResourceID, Optional<ConfigMap>> cache =
             new ConcurrentHashMap<>();
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
index 2b446cbe..5f45f368 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStore.java
@@ -108,15 +108,15 @@ public class KubernetesAutoScalerStateStore
     }
 
     @Override
-    public void storeEvaluatedMetrics(
+    public void storeCollectedMetrics(
             KubernetesJobAutoScalerContext jobContext,
-            SortedMap<Instant, CollectedMetrics> evaluatedMetrics) {
+            SortedMap<Instant, CollectedMetrics> metrics) {
         configMapStore.putSerializedState(
-                jobContext, COLLECTED_METRICS_KEY, 
serializeEvaluatedMetrics(evaluatedMetrics));
+                jobContext, COLLECTED_METRICS_KEY, 
serializeEvaluatedMetrics(metrics));
     }
 
     @Override
-    public Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(
+    public Optional<SortedMap<Instant, CollectedMetrics>> getCollectedMetrics(
             KubernetesJobAutoScalerContext jobContext) {
         Optional<String> serializedEvaluatedMetricsOpt =
                 configMapStore.getSerializedState(jobContext, 
COLLECTED_METRICS_KEY);
@@ -135,7 +135,7 @@ public class KubernetesAutoScalerStateStore
     }
 
     @Override
-    public void removeEvaluatedMetrics(KubernetesJobAutoScalerContext 
jobContext) {
+    public void removeCollectedMetrics(KubernetesJobAutoScalerContext 
jobContext) {
         configMapStore.removeSerializedState(jobContext, 
COLLECTED_METRICS_KEY);
     }
 
@@ -217,7 +217,7 @@ public class KubernetesAutoScalerStateStore
                         .orElse(0);
 
         Optional<SortedMap<Instant, CollectedMetrics>> evaluatedMetricsOpt =
-                getEvaluatedMetrics(context);
+                getCollectedMetrics(context);
         if (evaluatedMetricsOpt.isEmpty()) {
             return;
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
index 4618a498..21f0c7c3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerStateStoreTest.java
@@ -218,17 +218,17 @@ public class KubernetesAutoScalerStateStoreTest {
         var now = Instant.now();
 
         Assertions.assertEquals(scalingHistory, 
getTrimmedScalingHistory(stateStore, ctx, now));
-        
assertThat(stateStore.getEvaluatedMetrics(ctx)).hasValue(metricHistory);
+        
assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory);
 
         // Override with compressed data
         var newTs = Instant.now();
 
         addToScalingHistoryAndStore(stateStore, ctx, newTs, Map.of());
-        stateStore.storeEvaluatedMetrics(ctx, metricHistory);
+        stateStore.storeCollectedMetrics(ctx, metricHistory);
 
         // Make sure we can still access everything
         Assertions.assertEquals(scalingHistory, 
getTrimmedScalingHistory(stateStore, ctx, newTs));
-        
assertThat(stateStore.getEvaluatedMetrics(ctx)).hasValue(metricHistory);
+        
assertThat(stateStore.getCollectedMetrics(ctx)).hasValue(metricHistory);
     }
 
     @Test
@@ -265,7 +265,7 @@ public class KubernetesAutoScalerStateStoreTest {
                         new ScalingSummary(
                                 1, 2, Map.of(ScalingMetric.LAG, 
EvaluatedScalingMetric.of(2.)))));
 
-        stateStore.storeEvaluatedMetrics(ctx, metricHistory);
+        stateStore.storeCollectedMetrics(ctx, metricHistory);
 
         assertFalse(
                 configMapStore
@@ -314,7 +314,7 @@ public class KubernetesAutoScalerStateStoreTest {
                         configMapStore.getSerializedState(
                                 ctx, 
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))
                 .isPresent();
-        assertThat(stateStore.getEvaluatedMetrics(ctx)).isEmpty();
+        assertThat(stateStore.getCollectedMetrics(ctx)).isEmpty();
         assertThat(
                         configMapStore.getSerializedState(
                                 ctx, 
KubernetesAutoScalerStateStore.COLLECTED_METRICS_KEY))


Reply via email to