gyfora commented on code in PR #1078:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1078#discussion_r3356698745
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##########
@@ -494,6 +503,49 @@ public static double getAverage(
return n < minElements ? Double.NaN : sum / n;
}
+ public static double getAverageRate(
+ ScalingMetric metric,
+ @Nullable JobVertexID jobVertexId,
+ SortedMap<Instant, CollectedMetrics> metricsHistory) {
+
+ double sumRates = 0;
+ int n = 0;
+ Instant prevTs = null;
+ double prevValue = Double.NaN;
+
+ for (var entry : metricsHistory.entrySet()) {
+ double value =
+ entry.getValue()
+ .getVertexMetrics()
+ .get(jobVertexId)
+ .getOrDefault(metric, Double.NaN);
+ if (Double.isNaN(value)) {
+ continue;
+ }
+ if (!Double.isNaN(prevValue)) {
+ long tsDiff = Duration.between(prevTs,
entry.getKey()).toMillis();
+ if (tsDiff > 0) {
+ sumRates += 1000.0 * (value - prevValue) / tsDiff;
+ n++;
+ }
+ }
Review Comment:
Could you please explain the logic here on a high level? The original
intention with only using the cumulative values to compute the average rate is
that you actual get correct average input rate across the metric window. You
have num records in the beginning, num records at the end and you get the
average.
Can you please explain the averaging logic here that would lead to a better
result using the point in time per/second values? Feels like that can actually
fluctuate quite a lot and would not necessarily lead to the correct value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]