mxm commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1488088408


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -264,16 +299,14 @@ private void computeTargetDataRate(
         if (topology.isSource(vertex)) {
             double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
 
-            if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
+            double lagRate = getRate(LAG, vertex, metricsHistory);
+            double sourceDataRate = Math.max(0, inputRate + lagRate);

Review Comment:
   ```suggestion
               double ingestionDataRate = Math.max(0, inputRate + lagRate);
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -356,13 +421,13 @@ public static double getAverage(
                             ? 
collectedMetrics.getVertexMetrics().get(jobVertexId)
                             : collectedMetrics.getGlobalMetrics();
             double num = metrics.getOrDefault(metric, Double.NaN);
-            if (Double.isNaN(num)) {
-                continue;
-            }
             if (Double.isInfinite(num)) {
                 anyInfinite = true;
                 continue;
             }
+            if (Double.isNaN(num)) {
+                continue;
+            }

Review Comment:
   I'm curious, why this change?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##########
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
         for (JsonNode node : nodes) {
             var vertexId = JobVertexID.fromHexString(node.get("id").asText());
             var inputList = new HashSet<JobVertexID>();
+            var ioMetrics = metrics.get(vertexId);
+            var finished = finishedVertices.contains(vertexId);
             vertexInfo.add(
                     new VertexInfo(
                             vertexId,
                             inputList,
+                            null,
                             node.get("parallelism").asInt(),
                             maxParallelismMap.get(vertexId),
-                            finished.contains(vertexId)));
+                            maxParallelismMap.get(vertexId),

Review Comment:
   This is passing in twice the same value for 
maxParallelism/originalMaxParallelism. Not sure that is intended.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##########
@@ -39,12 +39,6 @@ public enum ScalingMetric {
     /** Current processing rate. */
     CURRENT_PROCESSING_RATE(true),
 
-    /**
-     * Incoming data rate to the source, e.g. rate of records written to the 
Kafka topic
-     * (records/sec).
-     */
-    SOURCE_DATA_RATE(true),

Review Comment:
   For anyone wondering, this metric wasn't actually used other than for 
populating the source's `TARGET_DATA_RATE` which we can do directly instead of 
going through this intermediary.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -341,6 +375,37 @@ public static double getAverage(
         return getAverage(metric, jobVertexId, metricsHistory, 1);
     }
 
+    public static double getRate(
+            ScalingMetric metric,
+            @Nullable JobVertexID jobVertexId,
+            SortedMap<Instant, CollectedMetrics> metricsHistory) {
+
+        Instant firstTs = null;
+        double first = Double.NaN;
+
+        Instant lastTs = null;
+        double last = Double.NaN;
+
+        for (var entry : metricsHistory.entrySet()) {
+            double value = 
entry.getValue().getVertexMetrics().get(jobVertexId).get(metric);
+            if (!Double.isNaN(value)) {
+                if (Double.isNaN(first)) {
+                    first = value;
+                    firstTs = entry.getKey();
+                } else {
+                    last = value;
+                    lastTs = entry.getKey();
+                }
+            }
+        }
+        if (Double.isNaN(last)) {
+            return Double.NaN;
+        }
+
+        double diff = last - first;
+        return diff == 0 ? 0 : diff / Duration.between(firstTs, 
lastTs).toSeconds();

Review Comment:
   Not sure why we return zero when the diff is zero. The return value would 
then be zero anyway. 



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##########
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
         for (JsonNode node : nodes) {
             var vertexId = JobVertexID.fromHexString(node.get("id").asText());
             var inputList = new HashSet<JobVertexID>();
+            var ioMetrics = metrics.get(vertexId);
+            var finished = finishedVertices.contains(vertexId);
             vertexInfo.add(
                     new VertexInfo(
                             vertexId,
                             inputList,
+                            null,
                             node.get("parallelism").asInt(),
                             maxParallelismMap.get(vertexId),
-                            finished.contains(vertexId)));
+                            maxParallelismMap.get(vertexId),

Review Comment:
   This might stem from Lombok Setters/Getters but I think it makes it less 
clear that only maxParallelism is mutated and originalMaxParallelisms is 
immutable.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##########
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
         for (JsonNode node : nodes) {
             var vertexId = JobVertexID.fromHexString(node.get("id").asText());
             var inputList = new HashSet<JobVertexID>();
+            var ioMetrics = metrics.get(vertexId);
+            var finished = finishedVertices.contains(vertexId);
             vertexInfo.add(
                     new VertexInfo(
                             vertexId,
                             inputList,
+                            null,
                             node.get("parallelism").asInt(),
                             maxParallelismMap.get(vertexId),
-                            finished.contains(vertexId)));
+                            maxParallelismMap.get(vertexId),

Review Comment:
   I think we want to remove the `maxParallelism` parameter and do the max 
parallelism housekeeping internally.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##########
@@ -43,49 +44,61 @@ public static void computeLoadMetrics(
             JobVertexID jobVertexID,
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             Map<ScalingMetric, Double> scalingMetrics,
+            IOMetrics ioMetrics,
             Configuration conf) {
 
-        double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
-        scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000);
+        scalingMetrics.put(
+                ScalingMetric.LOAD,
+                getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 
1000.);
+        scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, 
ioMetrics.getAccumulatedBusyTime());
+    }
+
+    private static double getBusyTimeMsPerSecond(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            Configuration conf,
+            JobVertexID jobVertexId) {
+        var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+        var busyTimeMsPerSecond =
+                
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
+        if (!Double.isFinite(busyTimeMsPerSecond)) {
+            if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) {
+                // We only want to log this once
+                LOG.warn(
+                        "No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
+                        jobVertexId);
+            }
+            return Double.NaN;
+        }
+        return Math.max(0, busyTimeMsPerSecond);
     }
 
     public static void computeDataRateMetrics(
             JobVertexID jobVertexID,
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             Map<ScalingMetric, Double> scalingMetrics,
             JobTopology topology,
-            double lagGrowthRate,
             Configuration conf,
             Supplier<Double> observedTprAvg) {
 
         var isSource = topology.isSource(jobVertexID);
+        var ioMetrics = topology.get(jobVertexID).getIoMetrics();
 
         double numRecordsInPerSecond =
-                getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
+                getNumRecordsIn(flinkMetrics, ioMetrics, jobVertexID, 
isSource, true);

Review Comment:
   I would prefer keeping `getNumRecordsInPerSecond` to avoid this non-obvious 
boolean flag as the last parameter. 
   
   ```java
   double getNumRecordsInPerSecond( 
               Map<FlinkMetric, AggregatedMetric> flinkMetrics,
               IOMetrics ioMetrics,
               JobVertexID jobVertexID,
               boolean isSource) {
      return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, 
isSource, true);
   }
   
   double getNumRecordsAccumulated(
               Map<FlinkMetric, AggregatedMetric> flinkMetrics,
               IOMetrics ioMetrics,
               JobVertexID jobVertexID,
               boolean isSource) {
      return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, 
isSource, false);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to