Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-20 Thread via GitHub


gyfora merged PR #774:
URL: https://github.com/apache/flink-kubernetes-operator/pull/774


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


1996fanrui commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1495183092


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -39,12 +39,6 @@ public enum ScalingMetric {
 /** Current processing rate. */
 CURRENT_PROCESSING_RATE(true),

Review Comment:
   Thanks for the update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1494884380


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -39,12 +39,6 @@ public enum ScalingMetric {
 /** Current processing rate. */
 CURRENT_PROCESSING_RATE(true),

Review Comment:
   On a second thought, you are right, I did not realise that the 
CURRENT_PROC_RATE was averaged, removing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1494880354


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -39,12 +39,6 @@ public enum ScalingMetric {
 /** Current processing rate. */
 CURRENT_PROCESSING_RATE(true),

Review Comment:
   I would not remove that as it has a bit different semantics and is mainly 
used to determine whether the job is processing backlog or not



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


gyfora commented on PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1952573994

   Makes sense, I will remove that commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


mxm commented on PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1952564474

   @gyfora Every breaking change in `ScalingMetrics` will cause the metric 
history to get dropped, e.g. renaming or change of enum order. That is ok if 
the window resets correctly; the beginning of the window should always be the 
"oldest" timestamp collected. That means that we automatically restart metric 
collection if data gets dropped. If that is not the case, then this is a bug. 
   
   So in short: Dropping collected metrics is ok, reducing the window size is 
not. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


gyfora commented on PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1952541506

   @mxm @1996fanrui while polishing / testing the PR I noticed that the removed 
metrics cause a backward incompatible change. I added a fix to allow such 
changes to go through without losing some important collected metrics, however 
it also means that during the initial period of starting with the new 
metrics/logic some decisions will be made with less than metric size window.
   
   Do you see any problem with this or would you prefer simply discarding the 
collected metrics as a whole?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-19 Thread via GitHub


1996fanrui commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1494239310


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -39,12 +39,6 @@ public enum ScalingMetric {
 /** Current processing rate. */
 CURRENT_PROCESSING_RATE(true),

Review Comment:
   Could all callers use the `getRate(ScalingMetric.NUM_RECORDS_IN, vertex, 
metricsHistory)` instead of `getAverage(CURRENT_PROCESSING_RATE, vertex, 
metricsHistory);`?
   
   If yes, we can remove this metric.
   
   Currently, only one caller use this metric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-18 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1493771207


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -379,22 +484,75 @@ public static double getAverage(
 return n < minElements ? Double.NaN : sum / n;
 }
 
-private static double getAverageOutputRatio(
-Edge edge, SortedMap metricsHistory) {
-double[] metricValues =
-metricsHistory.values().stream()
-.map(CollectedMetrics::getOutputRatios)
-.filter(m -> m.containsKey(edge))
-.mapToDouble(m -> m.get(edge))
-.filter(d -> !Double.isNaN(d))
-.toArray();
-for (double metricValue : metricValues) {
-if (Double.isInfinite(metricValue)) {
-// As long as infinite values are present, we can't properly 
average. We need to
-// wait until they are evicted.
-return metricValue;
+@VisibleForTesting
+protected static double computeOutputRatio(
+JobVertexID from,
+JobVertexID to,
+JobTopology topology,
+SortedMap metricsHistory) {
+
+double numRecordsIn = getRate(ScalingMetric.NUM_RECORDS_IN, from, 
metricsHistory);
+
+double outputRatio = 0;
+// If the input rate is zero, we also need to flatten the output rate.
+// Otherwise, the OUTPUT_RATIO would be outrageously large, leading to
+// a rapid scale up.
+if (numRecordsIn > 0) {
+double numRecordsOut = computeEdgeRecordsOut(topology, 
metricsHistory, from, to);

Review Comment:
   I improved the variable/method names for this part of the logic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-18 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1493771147


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -223,128 +203,65 @@ public static void computeLagMetrics(
 }
 }
 
-// TODO: FLINK-34213: Consider using accumulated busy time instead of 
busyMsPerSecond
-private static double getBusyTimeMsPerSecond(
+private static double getNumRecordsInPerSecond(
 Map flinkMetrics,
-Configuration conf,
-JobVertexID jobVertexId) {
-var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
-var busyTimeMsPerSecond =
-
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
-if (!Double.isFinite(busyTimeMsPerSecond)) {
-if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) {
-// We only want to log this once
-LOG.warn(
-"No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
-jobVertexId);
-}
-return Double.NaN;
-}
-return Math.max(0, busyTimeMsPerSecond);
+IOMetrics ioMetrics,
+JobVertexID jobVertexID,
+boolean isSource) {
+return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, 
isSource, true);
 }
 
-private static double getNumRecordsInPerSecond(
+private static double getNumRecordsAccumulated(
 Map flinkMetrics,
+IOMetrics ioMetrics,
 JobVertexID jobVertexID,
 boolean isSource) {
+return getNumRecordsInternal(flinkMetrics, ioMetrics, jobVertexID, 
isSource, false);
+}
+
+private static double getNumRecordsInternal(

Review Comment:
   Added



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java:
##
@@ -17,30 +17,68 @@
 
 package org.apache.flink.autoscaler.topology;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
-import lombok.RequiredArgsConstructor;
-import lombok.Value;
+import lombok.Data;
+
+import javax.annotation.Nullable;
 
 import java.util.Set;
 
 /** Job vertex information. */
-@Value
-@RequiredArgsConstructor
+@Data
 public class VertexInfo {
 
-JobVertexID id;
+private final JobVertexID id;
+
+private final Set inputs;
+
+private @Nullable Set outputs;

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 
-if (collectedMetrics.getMetricHistory().isEmpty()) {
+if (collectedMetrics.getMetricHistory().size() < 2) {

Review Comment:
   Would you mind adding a simple comment to explain why we call the scaling 
logic when metric history size >= 2?
   
   It may be helpful to let new developers to understand these logic.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


1996fanrui commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1490305805


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java:
##
@@ -171,7 +171,7 @@ private void runScalingLogic(Context ctx, 
AutoscalerFlinkMetrics autoscalerMetri
 stateStore.storeScalingTracking(ctx, scalingTracking);
 }
 
-if (collectedMetrics.getMetricHistory().isEmpty()) {
+if (collectedMetrics.getMetricHistory().size() < 2) {

Review Comment:
   Would you mind adding a simple comment to explain why we call the scaling 
logic when metric history size >= 2?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489408092


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java:
##
@@ -43,49 +44,61 @@ public static void computeLoadMetrics(
 JobVertexID jobVertexID,
 Map flinkMetrics,
 Map scalingMetrics,
+IOMetrics ioMetrics,
 Configuration conf) {
 
-double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
-scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000);
+scalingMetrics.put(
+ScalingMetric.LOAD,
+getBusyTimeMsPerSecond(flinkMetrics, conf, jobVertexID) / 
1000.);
+scalingMetrics.put(ScalingMetric.ACCUMULATED_BUSY_TIME, 
ioMetrics.getAccumulatedBusyTime());
+}
+
+private static double getBusyTimeMsPerSecond(
+Map flinkMetrics,
+Configuration conf,
+JobVertexID jobVertexId) {
+var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+var busyTimeMsPerSecond =
+
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
+if (!Double.isFinite(busyTimeMsPerSecond)) {
+if (AutoScalerUtils.excludeVertexFromScaling(conf, jobVertexId)) {
+// We only want to log this once
+LOG.warn(
+"No busyTimeMsPerSecond metric available for {}. No 
scaling will be performed for this vertex.",
+jobVertexId);
+}
+return Double.NaN;
+}
+return Math.max(0, busyTimeMsPerSecond);
 }
 
 public static void computeDataRateMetrics(
 JobVertexID jobVertexID,
 Map flinkMetrics,
 Map scalingMetrics,
 JobTopology topology,
-double lagGrowthRate,
 Configuration conf,
 Supplier observedTprAvg) {
 
 var isSource = topology.isSource(jobVertexID);
+var ioMetrics = topology.get(jobVertexID).getIoMetrics();
 
 double numRecordsInPerSecond =
-getNumRecordsInPerSecond(flinkMetrics, jobVertexID, isSource);
+getNumRecordsIn(flinkMetrics, ioMetrics, jobVertexID, 
isSource, true);

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489403136


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
 for (JsonNode node : nodes) {
 var vertexId = JobVertexID.fromHexString(node.get("id").asText());
 var inputList = new HashSet();
+var ioMetrics = metrics.get(vertexId);
+var finished = finishedVertices.contains(vertexId);
 vertexInfo.add(
 new VertexInfo(
 vertexId,
 inputList,
+null,
 node.get("parallelism").asInt(),
 maxParallelismMap.get(vertexId),
-finished.contains(vertexId)));
+maxParallelismMap.get(vertexId),

Review Comment:
   I will add an explicit constructor to clean this up instead of relying on 
Lombok allArgsConstructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489401020


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -356,13 +421,13 @@ public static double getAverage(
 ? 
collectedMetrics.getVertexMetrics().get(jobVertexId)
 : collectedMetrics.getGlobalMetrics();
 double num = metrics.getOrDefault(metric, Double.NaN);
-if (Double.isNaN(num)) {
-continue;
-}
 if (Double.isInfinite(num)) {
 anyInfinite = true;
 continue;
 }
+if (Double.isNaN(num)) {
+continue;
+}

Review Comment:
   I will revert this, I made some changes that I later removed, not necessary



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399611


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -264,16 +299,14 @@ private void computeTargetDataRate(
 if (topology.isSource(vertex)) {
 double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
 
-if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
+double lagRate = getRate(LAG, vertex, metricsHistory);
+double sourceDataRate = Math.max(0, inputRate + lagRate);

Review Comment:
   Make sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-14 Thread via GitHub


gyfora commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1489399318


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -341,6 +375,37 @@ public static double getAverage(
 return getAverage(metric, jobVertexId, metricsHistory, 1);
 }
 
+public static double getRate(
+ScalingMetric metric,
+@Nullable JobVertexID jobVertexId,
+SortedMap metricsHistory) {
+
+Instant firstTs = null;
+double first = Double.NaN;
+
+Instant lastTs = null;
+double last = Double.NaN;
+
+for (var entry : metricsHistory.entrySet()) {
+double value = 
entry.getValue().getVertexMetrics().get(jobVertexId).get(metric);
+if (!Double.isNaN(value)) {
+if (Double.isNaN(first)) {
+first = value;
+firstTs = entry.getKey();
+} else {
+last = value;
+lastTs = entry.getKey();
+}
+}
+}
+if (Double.isNaN(last)) {
+return Double.NaN;
+}
+
+double diff = last - first;
+return diff == 0 ? 0 : diff / Duration.between(firstTs, 
lastTs).toSeconds();

Review Comment:
   I will this, I still had to change the logic so that we compute the seconds 
rate using the millisecond diff otherwise we can accidentally divide by zero
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-13 Thread via GitHub


gyfora commented on PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-194188

   There are still some issues while actually running this so I need to track 
down some problems and add some new tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-13 Thread via GitHub


mxm commented on code in PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#discussion_r1488088408


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -264,16 +299,14 @@ private void computeTargetDataRate(
 if (topology.isSource(vertex)) {
 double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
 
-if (!latestVertexMetrics.containsKey(SOURCE_DATA_RATE)) {
+double lagRate = getRate(LAG, vertex, metricsHistory);
+double sourceDataRate = Math.max(0, inputRate + lagRate);

Review Comment:
   ```suggestion
   double ingestionDataRate = Math.max(0, inputRate + lagRate);
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -356,13 +421,13 @@ public static double getAverage(
 ? 
collectedMetrics.getVertexMetrics().get(jobVertexId)
 : collectedMetrics.getGlobalMetrics();
 double num = metrics.getOrDefault(metric, Double.NaN);
-if (Double.isNaN(num)) {
-continue;
-}
 if (Double.isInfinite(num)) {
 anyInfinite = true;
 continue;
 }
+if (Double.isNaN(num)) {
+continue;
+}

Review Comment:
   I'm curious, why this change?



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
 for (JsonNode node : nodes) {
 var vertexId = JobVertexID.fromHexString(node.get("id").asText());
 var inputList = new HashSet();
+var ioMetrics = metrics.get(vertexId);
+var finished = finishedVertices.contains(vertexId);
 vertexInfo.add(
 new VertexInfo(
 vertexId,
 inputList,
+null,
 node.get("parallelism").asInt(),
 maxParallelismMap.get(vertexId),
-finished.contains(vertexId)));
+maxParallelismMap.get(vertexId),

Review Comment:
   This is passing in twice the same value for 
maxParallelism/originalMaxParallelism. Not sure that is intended.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -39,12 +39,6 @@ public enum ScalingMetric {
 /** Current processing rate. */
 CURRENT_PROCESSING_RATE(true),
 
-/**
- * Incoming data rate to the source, e.g. rate of records written to the 
Kafka topic
- * (records/sec).
- */
-SOURCE_DATA_RATE(true),

Review Comment:
   For anyone wondering, this metric wasn't actually used other than for 
populating the source's `TARGET_DATA_RATE` which we can do directly instead of 
going through this intermediary.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -341,6 +375,37 @@ public static double getAverage(
 return getAverage(metric, jobVertexId, metricsHistory, 1);
 }
 
+public static double getRate(
+ScalingMetric metric,
+@Nullable JobVertexID jobVertexId,
+SortedMap metricsHistory) {
+
+Instant firstTs = null;
+double first = Double.NaN;
+
+Instant lastTs = null;
+double last = Double.NaN;
+
+for (var entry : metricsHistory.entrySet()) {
+double value = 
entry.getValue().getVertexMetrics().get(jobVertexId).get(metric);
+if (!Double.isNaN(value)) {
+if (Double.isNaN(first)) {
+first = value;
+firstTs = entry.getKey();
+} else {
+last = value;
+lastTs = entry.getKey();
+}
+}
+}
+if (Double.isNaN(last)) {
+return Double.NaN;
+}
+
+double diff = last - first;
+return diff == 0 ? 0 : diff / Duration.between(firstTs, 
lastTs).toSeconds();

Review Comment:
   Not sure why we return zero when the diff is zero. The return value would 
then be zero anyway. 



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java:
##
@@ -146,13 +145,18 @@ public static JobTopology fromJsonPlan(
 for (JsonNode node : nodes) {
 var vertexId = JobVertexID.fromHexString(node.get("id").asText());
 var inputList = new HashSet();
+var ioMetrics = metrics.get(vertexId);
+var finished = finishedVertices.contains(vertexId);
 vertexInfo.add(
 new VertexInfo(
 vertexId,
 

Re: [PR] [FLINK-34266] Compute Autoscaler metrics correctly over metric window [flink-kubernetes-operator]

2024-02-13 Thread via GitHub


gyfora commented on PR #774:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/774#issuecomment-1941448169

   cc @mateczagany 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org