mxm commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1488088408
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ########## @@ -264,16 +299,14 @@ private void computeTargetDataRate( if (topology.isSource(vertex)) { double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); - if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) { + double lagRate = getRate(LAG, vertex, metricsHistory); + double sourceDataRate = Math.max(0, inputRate + lagRate); Review Comment: ```suggestion double ingestionDataRate = Math.max(0, inputRate + lagRate); ``` ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ########## @@ -356,13 +421,13 @@ public static double getAverage( ? collectedMetrics.getVertexMetrics().get(jobVertexId) : collectedMetrics.getGlobalMetrics(); double num = metrics.getOrDefault(metric, Double.NaN); - if (Double.isNaN(num)) { - continue; - } if (Double.isInfinite(num)) { anyInfinite = true; continue; } + if (Double.isNaN(num)) { + continue; + } Review Comment: I'm curious, why this change? ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ########## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet<JobVertexID>(); + var ioMetrics = metrics.get(vertexId); + var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, + null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), - finished.contains(vertexId))); + maxParallelismMap.get(vertexId), Review Comment: This is passing in twice the same value for maxParallelism/originalMaxParallelism. Not sure that is intended. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ########## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), - /** - * Incoming data rate to the source, e.g. rate of records written to the Kafka topic - * (records/sec). - */ - SOURCE_DATA_RATE(true), Review Comment: For anyone wondering, this metric wasn't actually used other than for populating the source's `TARGET_DATA_RATE` which we can do directly instead of going through this intermediary. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ########## @@ -341,6 +375,37 @@ public static double getAverage( return getAverage(metric, jobVertexId, metricsHistory, 1); } + public static double getRate( + ScalingMetric metric, + @Nullable JobVertexID jobVertexId, + SortedMap<Instant, CollectedMetrics> metricsHistory) { + + Instant firstTs = null; + double first = Double.NaN; + + Instant lastTs = null; + double last = Double.NaN; + + for (var entry : metricsHistory.entrySet()) { + double value = entry.getValue().getVertexMetrics().get(jobVertexId).get(metric); + if (!Double.isNaN(value)) { + if (Double.isNaN(first)) { + first = value; + firstTs = entry.getKey(); + } else { + last = value; + lastTs = entry.getKey(); + } + } + } + if (Double.isNaN(last)) { + return Double.NaN; + } + + double diff = last - first; + return diff == 0 ? 0 : diff / Duration.between(firstTs, lastTs).toSeconds(); Review Comment: Not sure why we return zero when the diff is zero. The return value would then be zero anyway. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ########## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet<JobVertexID>(); + var ioMetrics = metrics.get(vertexId); + var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, + null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), - finished.contains(vertexId))); + maxParallelismMap.get(vertexId), Review Comment: This might stem from Lombok Setters/Getters but I think it makes it less clear that only maxParallelism is mutated and originalMaxParallelisms is immutable. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ########## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet<JobVertexID>(); + var ioMetrics = metrics.get(vertexId); + var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, + null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), - finished.contains(vertexId))); + maxParallelismMap.get(vertexId), Review Comment: I think we want to remove the `maxParallelism` parameter and do the max parallelism housekeeping internally. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ########## @@ -43,49 +44,61 @@ public static void computeLoadMetrics( JobVertexID jobVertexID, Map<FlinkMetric, AggregatedMetric> flinkMetrics, Map<ScalingMetric, Double> scalingMetrics, + IOMetrics ioMetrics, Configuration conf) { - double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID); - scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000); + scalingMetrics.put( + ScalingMetric.LOAD, + getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 1000.); + scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, ioMetrics.getAccumulatedBusyTime()); + } + + private static double getBusyTimeMsPerSecond( + Map<FlinkMetric, AggregatedMetric> flinkMetrics, + Configuration conf, + JobVertexID jobVertexId) { + var busyTimeAggregator = conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR); + var busyTimeMsPerSecond = + busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC)); + if (!Double.isFinite(busyTimeMsPerSecond)) { + if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) { + // We only want to log this once + LOG.warn( + "No busyTimeMsPerSecond metric available for {}. No scaling will be performed for this vertex.", + jobVertexId); + } + return Double.NaN; + } + return Math.max(0, busyTimeMsPerSecond); } public static void computeDataRateMetrics( JobVertexID jobVertexID, Map<FlinkMetric, AggregatedMetric> flinkMetrics, Map<ScalingMetric, Double> scalingMetrics, JobTopology topology, - double lagGrowthRate, Configuration conf, Supplier<Double> observedTprAvg) { var isSource = topology.isSource(jobVertexID); + var ioMetrics = topology.get(jobVertexID).getIoMetrics(); double numRecordsInPerSecond = - getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource); + getNumRecordsIn(flinkMetrics, ioMetrics, jobVertexID, isSource, true); Review Comment: I would prefer keeping `getNumRecordsInPerSecond` to avoid this non-obvious boolean flag as the last parameter. ```java double getNumRecordsInPerSecond( Map<FlinkMetric, AggregatedMetric> flinkMetrics, IOMetrics ioMetrics, JobVertexID jobVertexID, boolean isSource) { return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, isSource, true); } double getNumRecordsAccumulated( Map<FlinkMetric, AggregatedMetric> flinkMetrics, IOMetrics ioMetrics, JobVertexID jobVertexID, boolean isSource) { return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, isSource, false); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org