Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora merged PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
1996fanrui commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1495183092 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), Review Comment: Thanks for the update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1494884380 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), Review Comment: On a second thought, you are right, I did not realise that the CURRENT_PROC_RATE was averaged, removing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1494880354 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), Review Comment: I would not remove that as it has a bit different semantics and is mainly used to determine whether the job is processing backlog or not -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1952573994 Makes sense, I will remove that commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
mxm commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1952564474 @gyfora Every breaking change in `ScalingMetrics` will cause the metric history to get dropped, e.g. renaming or change of enum order. That is ok if the window resets correctly; the beginning of the window should always be the "oldest" timestamp collected. That means that we automatically restart metric collection if data gets dropped. If that is not the case, then this is a bug. So in short: Dropping collected metrics is ok, reducing the window size is not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1952541506 @mxm @1996fanrui while polishing / testing the PR I noticed that the removed metrics cause a backward incompatible change. I added a fix to allow such changes to go through without losing some important collected metrics, however it also means that during the initial period of starting with the new metrics/logic some decisions will be made with less than metric size window. Do you see any problem with this or would you prefer simply discarding the collected metrics as a whole? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
1996fanrui commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1494239310 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), Review Comment: Could all callers use the `getRate(ScalingMetric.NUM_RECORDS_IN, vertex, metricsHistory)` instead of `getAverage(CURRENT_PROCESSING_RATE, vertex, metricsHistory);`? If yes, we can remove this metric. Currently, only one caller use this metric. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1493771207 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -379,22 +484,75 @@ public static double getAverage( return n < minElements ? Double.NaN : sum / n; } -private static double getAverageOutputRatio( -Edge edge, SortedMap metricsHistory) { -double[] metricValues = -metricsHistory.values().stream() -.map(CollectedMetrics::getOutputRatios) -.filter(m -> m.containsKey(edge)) -.mapToDouble(m -> m.get(edge)) -.filter(d -> !Double.isNaN(d)) -.toArray(); -for (double metricValue : metricValues) { -if (Double.isInfinite(metricValue)) { -// As long as infinite values are present, we can't properly average. We need to -// wait until they are evicted. -return metricValue; +@VisibleForTesting +protected static double computeOutputRatio( +JobVertexID from, +JobVertexID to, +JobTopology topology, +SortedMap metricsHistory) { + +double numRecordsIn = getRate(ScalingMetric.NUM_RECORDS_IN, from, metricsHistory); + +double outputRatio = 0; +// If the input rate is zero, we also need to flatten the output rate. +// Otherwise, the OUTPUT_RATIO would be outrageously large, leading to +// a rapid scale up. +if (numRecordsIn > 0) { +double numRecordsOut = computeEdgeRecordsOut(topology, metricsHistory, from, to); Review Comment: I improved the variable/method names for this part of the logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1493771147 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ## @@ -223,128 +203,65 @@ public static void computeLagMetrics( } } -// TODO: FLINK-34213: Consider using accumulated busy time instead of busyMsPerSecond -private static double getBusyTimeMsPerSecond( +private static double getNumRecordsInPerSecond( Map flinkMetrics, -Configuration conf, -JobVertexID jobVertexId) { -var busyTimeAggregator = conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR); -var busyTimeMsPerSecond = - busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC)); -if (!Double.isFinite(busyTimeMsPerSecond)) { -if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) { -// We only want to log this once -LOG.warn( -"No busyTimeMsPerSecond metric available for {}. No scaling will be performed for this vertex.", -jobVertexId); -} -return Double.NaN; -} -return Math.max(0, busyTimeMsPerSecond); +IOMetrics ioMetrics, +JobVertexID jobVertexID, +boolean isSource) { +return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, isSource, true); } -private static double getNumRecordsInPerSecond( +private static double getNumRecordsAccumulated( Map flinkMetrics, +IOMetrics ioMetrics, JobVertexID jobVertexID, boolean isSource) { +return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, isSource, false); +} + +private static double getNumRecordsInternal( Review Comment: Added ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java: ## @@ -17,30 +17,68 @@ package org.apache.flink.autoscaler.topology; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.jobgraph.JobVertexID; -import lombok.RequiredArgsConstructor; -import lombok.Value; +import lombok.Data; + +import javax.annotation.Nullable; import java.util.Set; /** Job vertex information. */ -@Value -@RequiredArgsConstructor +@Data public class VertexInfo { -JobVertexID id; +private final JobVertexID id; + +private final Set inputs; + +private @Nullable Set outputs; Review Comment: makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
1996fanrui commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri stateStore.storeScalingTracking(ctx, scalingTracking); } -if (collectedMetrics.getMetricHistory().isEmpty()) { +if (collectedMetrics.getMetricHistory().size() < 2) { Review Comment: Would you mind adding a simple comment to explain why we call the scaling logic when metric history size >= 2? It may be helpful to let new developers to understand these logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
1996fanrui commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java: ## @@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri stateStore.storeScalingTracking(ctx, scalingTracking); } -if (collectedMetrics.getMetricHistory().isEmpty()) { +if (collectedMetrics.getMetricHistory().size() < 2) { Review Comment: Would you mind adding a simple comment to explain why we call the scaling logic when metric history size >= 2? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489408092 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java: ## @@ -43,49 +44,61 @@ public static void computeLoadMetrics( JobVertexID jobVertexID, Map flinkMetrics, Map scalingMetrics, +IOMetrics ioMetrics, Configuration conf) { -double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID); -scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000); +scalingMetrics.put( +ScalingMetric.LOAD, +getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 1000.); +scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, ioMetrics.getAccumulatedBusyTime()); +} + +private static double getBusyTimeMsPerSecond( +Map flinkMetrics, +Configuration conf, +JobVertexID jobVertexId) { +var busyTimeAggregator = conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR); +var busyTimeMsPerSecond = + busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC)); +if (!Double.isFinite(busyTimeMsPerSecond)) { +if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) { +// We only want to log this once +LOG.warn( +"No busyTimeMsPerSecond metric available for {}. No scaling will be performed for this vertex.", +jobVertexId); +} +return Double.NaN; +} +return Math.max(0, busyTimeMsPerSecond); } public static void computeDataRateMetrics( JobVertexID jobVertexID, Map flinkMetrics, Map scalingMetrics, JobTopology topology, -double lagGrowthRate, Configuration conf, Supplier observedTprAvg) { var isSource = topology.isSource(jobVertexID); +var ioMetrics = topology.get(jobVertexID).getIoMetrics(); double numRecordsInPerSecond = -getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource); +getNumRecordsIn(flinkMetrics, ioMetrics, jobVertexID, isSource, true); Review Comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489403136 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet(); +var ioMetrics = metrics.get(vertexId); +var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, +null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), -finished.contains(vertexId))); +maxParallelismMap.get(vertexId), Review Comment: I will add an explicit constructor to clean this up instead of relying on Lombok allArgsConstructor -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489401020 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -356,13 +421,13 @@ public static double getAverage( ? collectedMetrics.getVertexMetrics().get(jobVertexId) : collectedMetrics.getGlobalMetrics(); double num = metrics.getOrDefault(metric, Double.NaN); -if (Double.isNaN(num)) { -continue; -} if (Double.isInfinite(num)) { anyInfinite = true; continue; } +if (Double.isNaN(num)) { +continue; +} Review Comment: I will revert this, I made some changes that I later removed, not necessary -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399611 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -264,16 +299,14 @@ private void computeTargetDataRate( if (topology.isSource(vertex)) { double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); -if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) { +double lagRate = getRate(LAG, vertex, metricsHistory); +double sourceDataRate = Math.max(0, inputRate + lagRate); Review Comment: Make sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399318 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -341,6 +375,37 @@ public static double getAverage( return getAverage(metric, jobVertexId, metricsHistory, 1); } +public static double getRate( +ScalingMetric metric, +@Nullable JobVertexID jobVertexId, +SortedMap metricsHistory) { + +Instant firstTs = null; +double first = Double.NaN; + +Instant lastTs = null; +double last = Double.NaN; + +for (var entry : metricsHistory.entrySet()) { +double value = entry.getValue().getVertexMetrics().get(jobVertexId).get(metric); +if (!Double.isNaN(value)) { +if (Double.isNaN(first)) { +first = value; +firstTs = entry.getKey(); +} else { +last = value; +lastTs = entry.getKey(); +} +} +} +if (Double.isNaN(last)) { +return Double.NaN; +} + +double diff = last - first; +return diff == 0 ? 0 : diff / Duration.between(firstTs, lastTs).toSeconds(); Review Comment: I will this, I still had to change the logic so that we compute the seconds rate using the millisecond diff otherwise we can accidentally divide by zero -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-194188 There are still some issues while actually running this so I need to track down some problems and add some new tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
mxm commented on code in PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1488088408 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -264,16 +299,14 @@ private void computeTargetDataRate( if (topology.isSource(vertex)) { double catchUpTargetSec = conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds(); -if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) { +double lagRate = getRate(LAG, vertex, metricsHistory); +double sourceDataRate = Math.max(0, inputRate + lagRate); Review Comment: ```suggestion double ingestionDataRate = Math.max(0, inputRate + lagRate); ``` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -356,13 +421,13 @@ public static double getAverage( ? collectedMetrics.getVertexMetrics().get(jobVertexId) : collectedMetrics.getGlobalMetrics(); double num = metrics.getOrDefault(metric, Double.NaN); -if (Double.isNaN(num)) { -continue; -} if (Double.isInfinite(num)) { anyInfinite = true; continue; } +if (Double.isNaN(num)) { +continue; +} Review Comment: I'm curious, why this change? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet(); +var ioMetrics = metrics.get(vertexId); +var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId, inputList, +null, node.get("parallelism").asInt(), maxParallelismMap.get(vertexId), -finished.contains(vertexId))); +maxParallelismMap.get(vertexId), Review Comment: This is passing in twice the same value for maxParallelism/originalMaxParallelism. Not sure that is intended. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -39,12 +39,6 @@ public enum ScalingMetric { /** Current processing rate. */ CURRENT_PROCESSING_RATE(true), -/** - * Incoming data rate to the source, e.g. rate of records written to the Kafka topic - * (records/sec). - */ -SOURCE_DATA_RATE(true), Review Comment: For anyone wondering, this metric wasn't actually used other than for populating the source's `TARGET_DATA_RATE` which we can do directly instead of going through this intermediary. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -341,6 +375,37 @@ public static double getAverage( return getAverage(metric, jobVertexId, metricsHistory, 1); } +public static double getRate( +ScalingMetric metric, +@Nullable JobVertexID jobVertexId, +SortedMap metricsHistory) { + +Instant firstTs = null; +double first = Double.NaN; + +Instant lastTs = null; +double last = Double.NaN; + +for (var entry : metricsHistory.entrySet()) { +double value = entry.getValue().getVertexMetrics().get(jobVertexId).get(metric); +if (!Double.isNaN(value)) { +if (Double.isNaN(first)) { +first = value; +firstTs = entry.getKey(); +} else { +last = value; +lastTs = entry.getKey(); +} +} +} +if (Double.isNaN(last)) { +return Double.NaN; +} + +double diff = last - first; +return diff == 0 ? 0 : diff / Duration.between(firstTs, lastTs).toSeconds(); Review Comment: Not sure why we return zero when the diff is zero. The return value would then be zero anyway. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java: ## @@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan( for (JsonNode node : nodes) { var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputList = new HashSet(); +var ioMetrics = metrics.get(vertexId); +var finished = finishedVertices.contains(vertexId); vertexInfo.add( new VertexInfo( vertexId,
Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]
gyfora commented on PR #774: URL: https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1941448169 cc @mateczagany -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org