Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-17 Thread via GitHub


gyfora merged PR #726:
URL: https://github.com/apache/flink-kubernetes-operator/pull/726


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-15 Thread via GitHub


mxm commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1857974906

   Thanks for the ping. Looks good!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-15 Thread via GitHub


1996fanrui commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1857868649

   Thanks @gyfora for the ping. 
   
   I don't have any comments, and I have approved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-15 Thread via GitHub


gyfora commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1857864814

   Do you have any further comments / concerns @1996fanrui @mxm ?
   
   The thresholds have been adjusted so by default we do not ever block, and we 
plan to iterate and refine the behaviour based on these metrics in the upcoming 
improvements to make the behaviour a bit more graceful :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1427474826


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+
.withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold"))
+.withDescription(
+"Max allowed GC pressure (percentage spent garbage 
collecting) during scaling operations. Autoscaling will be paused if the GC 
pressure exceeds this limit.");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(1.)
+
.withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold"))

Review Comment:
   Thanks for the clarification!
   
   Let's respect this rule in the future~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


mxm commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426749088


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+
.withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold"))
+.withDescription(
+"Max allowed GC pressure (percentage spent garbage 
collecting) during scaling operations. Autoscaling will be paused if the GC 
pressure exceeds this limit.");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(1.)
+
.withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold"))

Review Comment:
   I agree, we should support the old-style config namespace until the next 
major release. The overhead of this is very low and our users do not have to 
worry about using the right prefix for their configuration. As things stand, 
even if we removed the old namespace, we would have to add a mapping in the 
operator (either public our in our fork) because it is not feasible to go to 
every user to ask them to change their configs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426736930


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+
.withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold"))
+.withDescription(
+"Max allowed GC pressure (percentage spent garbage 
collecting) during scaling operations. Autoscaling will be paused if the GC 
pressure exceeds this limit.");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(1.)
+
.withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold"))

Review Comment:
   After a brief discussion with @mxm offline, we feel that it's easiest and 
harmless to keep the fallback keys consistent for all options. We may be able 
to remove these at a future point (a new major version of the operator) but 
it's simply better to have a consistent fallback syntax for operator users.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426689038


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java:
##
@@ -188,6 +187,13 @@ public void testEndToEnd() throws Exception {
 
 
assertNotNull(metricsCollector.getHistories().get(context.getJobKey()));
 
+// Make sure all reported vertex metrics are evaluated, we expect 
complete metrics when a
+// vertex is actually scaled
+// Also for sources we have LAG metrics that is not available for 
other vertices
+assertEquals(
+ScalingMetric.REPORTED_VERTEX_METRICS,
+evaluation.getVertexMetrics().get(source1).keySet());

Review Comment:
   Thank you 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426670350


##
flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java:
##
@@ -188,6 +187,13 @@ public void testEndToEnd() throws Exception {
 
 
assertNotNull(metricsCollector.getHistories().get(context.getJobKey()));
 
+// Make sure all reported vertex metrics are evaluated, we expect 
complete metrics when a
+// vertex is actually scaled
+// Also for sources we have LAG metrics that is not available for 
other vertices
+assertEquals(
+ScalingMetric.REPORTED_VERTEX_METRICS,
+evaluation.getVertexMetrics().get(source1).keySet());

Review Comment:
   @1996fanrui this is the added test for metric reporting / evaluation 
completeness



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426669381


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+
.withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold"))
+.withDescription(
+"Max allowed GC pressure (percentage spent garbage 
collecting) during scaling operations. Autoscaling will be paused if the GC 
pressure exceeds this limit.");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(1.)
+
.withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold"))

Review Comment:
   true, removing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426576655


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+
.withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold"))
+.withDescription(
+"Max allowed GC pressure (percentage spent garbage 
collecting) during scaling operations. Autoscaling will be paused if the GC 
pressure exceeds this limit.");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(1.)
+
.withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold"))

Review Comment:
   `oldOperatorConfigKey` is not needed, right?
   
   `GC_PRESSURE_THRESHOLD` is same as well.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -65,15 +69,35 @@ public enum ScalingMetric {
 SCALE_DOWN_RATE_THRESHOLD(false),
 
 /** Expected true processing rate after scale up. */
-EXPECTED_PROCESSING_RATE(false);
+EXPECTED_PROCESSING_RATE(false),
 
-private final boolean calculateAverage;
+/**
+ * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+ * 0 (no time in GC) and 1 (100% time in GC).
+ */
+GC_PRESSURE(false),
+
+/** Percentage of max heap used (between 0 and 1). */
+HEAP_USAGE(true);
+
+@Getter private final boolean calculateAverage;
+
+/** List of {@link ScalingMetric}s to be reported as per vertex Flink 
metrics. */
+public static final List REPORTED_VERTEX_METRICS =
+List.of(
+LOAD,
+TRUE_PROCESSING_RATE,
+TARGET_DATA_RATE,
+CATCH_UP_DATA_RATE,
+LAG,
+PARALLELISM,
+RECOMMENDED_PARALLELISM,
+MAX_PARALLELISM,
+SCALE_UP_RATE_THRESHOLD,
+SCALE_DOWN_RATE_THRESHOLD,
+EXPECTED_PROCESSING_RATE);

Review Comment:
   > I have added a test to compare the list of evaluated metrics to this to 
help developers when adding new metrics.
   
   Sorry, could you please clarify which test will fail when one new metric 
should be added to `REPORTED_VERTEX_METRICS` set but developers forget it? I 
didn't find it.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+  

Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1855635579

   Fair point @mxm , I will adjust the limits to effectively disable the 
blocking by default. And we can build some more robust mechanisms on top of 
these collected TM level metrics soon. I think @1996fanrui alreqdy started 
brainstorming around this 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


mxm commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1855592264

   >However I am personally very much in favour of enabling it at least for the 
GC time for the following reason:
   
   >When you spend a considerable time in GC (30% is actually a lot) the 
processing is probably extremely slow compared to a state where you have normal 
gc time. In my experience when you are above 30% GC you are usually way above 
it and the degradation is massive, in most of these cases processing comes to 
almost a complete halt relatively speaking.
   
   >This means that true processing rate and other time based measurements are 
completely off (very low compared to the actual value without memory pressure). 
So the scale up in this case would be very much overshooting, risking a large 
resource / cost spike that is basically 100% wrong. I think this is a big 
production risk that can affect trust in the autoscaler.
   
   I fully understand the rational for blocking scaling decisions but I think I 
draw different conclusions from these scenarios.
   
   Consider the following scenario: We scaled down to parallelism 1 at night 
because there was no traffic. In the morning, either immediately, or as traffic 
ramps up and we scale up gradually, we get trapped by the GC feature. I would 
prefer overscaling as opposed to getting stuck. The scaling would be capped in 
resources either by the existing max parallelism setting or by an upcoming 
fairness feature which will allow allocating only a fraction of the max cluster 
resources. 
   
   I believe this PR is part of a solution to handle GC pressure / heap memory 
issues, but I'm not convinced blocking scaling is the behavior I would like to 
see. Ultimately, memoization of the processing capacity, processing rate, and 
GC pressure at each parallelism would help to build a simple model of what kind 
of scaling decisions to prevent. For example, if we end up in a high GC 
scenario under a certain input rate, we would block scaling to that parallelism 
(or a close parallelism). I feel quite strong about never blocking scaling 
because this is the worst kind of outcome for the user.
   
   I'm ok with merging this feature but I would make sure it is disabled for us 
until we address the above concerns. Generally, I wonder whether it makes sense 
to go back to the drawing board for these kind of impactful features. It is 
good to have discussions upfront because the implementation can be a lot of 
work. Again, nothing wrong with this change, thank you for making it possible, 
but I feel like it needs additional work to deliver the full potential value to 
users.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425246326


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+.isEmpty();

Review Comment:
   That’s not how this works unfortunately, if any of the metrics is not found 
an empty response comes back from Flink 



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+.isEmpty();

Review Comment:
   That’s not how this works unfortunately, if any of the metrics is not found 
an empty response comes back from Flink 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425233692


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+.isEmpty();

Review Comment:
   When the `Heap.Max` and `Heap.Used` are found, the `gcMetricsFound`  is ture.
   
   Is it expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425233692


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +99,89 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+boolean hasGcMetrics =
+jobsWithGcMetrics.computeIfAbsent(
+ctx.getJobKey(),
+k -> {
+boolean gcMetricsFound =
+!queryAggregatedTmMetrics(
+restClient, 
TM_METRIC_NAMES_WITH_GC)
+.isEmpty();

Review Comment:
   When the `Heap.Max` and `Heap.Used` are found, we think `gcMetricsFound`  is 
ture.
   
   Is it expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


gyfora commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1853576202

   > I worry a bit about the implications of enabling this feature by default 
(also see [#726 
(comment)](https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422277362)).
 Can we put this behind a flag?
   > 
   > I feel like the solution to this problem is more complex, but the current 
code is definitely part of the solution. I'm not sure stopping to scale on GC 
pressure / heap usage is always desirable. In a lot of scenarios, scaling up 
might resolve GC pressure / heap usage issues. But after merging this PR, we 
might get stuck in a bad state. Users do not typically monitor their pipelines 
events that closely.
   > 
   > If we discover heap / GC pressure, it looks like we want to let the user 
know, scale up to solve the issue, and block scaling to a lower parallelism. 
Not allowing scaling might actually make the situation worse.
   
   @1996fanrui @mxm 
   
   I don't really want to add more on/off switches but we can consider setting 
the default limit larger or to 1 (effectively disabling this feature) initially 
if we think this needs more iteration.
   
   However I am personally very much in favour of enabling it at least for the 
GC time for the following reason:
   
   When you spend a considerable time in GC (30% is actually a lot) the 
processing is probably extremely slow compared to a state where you have normal 
gc time. In my experience when you are above 30% GC you are usually way above 
it and the degradation is massive, in most of these cases processing comes to 
almost a complete halt relatively speaking.
   
   This means that true processing time and other time based measurements are 
completely off (very low compared to the actual value without memory pressure). 
So the scale up in this case would be very much overshooting, risking a large 
resource / cost spike that is basically 100% wrong. I think this is a big 
production risk that can affect trust in the autoscaler.
   
   So I see 3 options here:
1. Keep the proposed 30% threshold
2. Increase the threshold to let's say 50% to be on the slightly safer side
3. Disable the check by default

   I am personally +1 on 1 or 2 with a slight preference towards 1.
   
   I agree with @1996fanrui s concern for the more simplistic heap check should 
be turned off by default. (threshold to 1)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425082293


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -65,15 +69,35 @@ public enum ScalingMetric {
 SCALE_DOWN_RATE_THRESHOLD(false),
 
 /** Expected true processing rate after scale up. */
-EXPECTED_PROCESSING_RATE(false);
+EXPECTED_PROCESSING_RATE(false),
 
-private final boolean calculateAverage;
+/**
+ * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+ * 0 (no time in GC) and 1 (100% time in GC).
+ */
+GC_PRESSURE(false),
+
+/** Percentage of max heap used (between 0 and 1). */
+HEAP_USAGE(true);
+
+@Getter private final boolean calculateAverage;
+
+/** List of {@link ScalingMetric}s to be reported as per vertex Flink 
metrics. */
+public static final List REPORTED_VERTEX_METRICS =
+List.of(
+LOAD,
+TRUE_PROCESSING_RATE,
+TARGET_DATA_RATE,
+CATCH_UP_DATA_RATE,
+LAG,
+PARALLELISM,
+RECOMMENDED_PARALLELISM,
+MAX_PARALLELISM,
+SCALE_UP_RATE_THRESHOLD,
+SCALE_DOWN_RATE_THRESHOLD,
+EXPECTED_PROCESSING_RATE);

Review Comment:
   I have added a test to compare the list of evaluated metrics to this to help 
developers when adding new metrics. Initially I tried the enum based approach 
but it just makes the ScalingMetric enum more complicated without any practical 
benefit. We still have to make sure independently that these metrics are 
evaluated correctly for reporting.
   
   Will push this in a second



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1424853109


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -65,15 +69,35 @@ public enum ScalingMetric {
 SCALE_DOWN_RATE_THRESHOLD(false),
 
 /** Expected true processing rate after scale up. */
-EXPECTED_PROCESSING_RATE(false);
+EXPECTED_PROCESSING_RATE(false),
 
-private final boolean calculateAverage;
+/**
+ * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+ * 0 (no time in GC) and 1 (100% time in GC).
+ */
+GC_PRESSURE(false),
+
+/** Percentage of max heap used (between 0 and 1). */
+HEAP_USAGE(true);
+
+@Getter private final boolean calculateAverage;
+
+/** List of {@link ScalingMetric}s to be reported as per vertex Flink 
metrics. */
+public static final List REPORTED_VERTEX_METRICS =
+List.of(
+LOAD,
+TRUE_PROCESSING_RATE,
+TARGET_DATA_RATE,
+CATCH_UP_DATA_RATE,
+LAG,
+PARALLELISM,
+RECOMMENDED_PARALLELISM,
+MAX_PARALLELISM,
+SCALE_UP_RATE_THRESHOLD,
+SCALE_DOWN_RATE_THRESHOLD,
+EXPECTED_PROCESSING_RATE);

Review Comment:
   nit: Could we add one filed in this enum to indicate should each enum report 
vertex metrics?
   
   Add a list can work, but other developers or we will forget it when adding 
one new enum in the future. If we add one field, every one cannot forget it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


1996fanrui commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1853275312

   > I feel like the solution to this problem is more complex, but the current 
code is definitely part of the solution. I'm not sure stopping to scale on GC 
pressure / heap usage is always desirable. In a lot of scenarios, scaling up 
might resolve GC pressure / heap usage issues. But after merging this PR, we 
might get stuck in a bad state. Users do not typically monitor their pipelines 
events that closely.
   > 
   > If we discover heap / GC pressure, it looks like we want to let the user 
know, scale up to solve the issue, and block scaling to a lower parallelism. 
Not allowing scaling might actually make the situation worse.
   
   Sounds make sense!
   
   Could we extract a common logic related to unhealthy? And unhealthy includes 
2 parts:
   
   - How to check one job is unhealthy? Such as: GC severe or memory high is 
some type of unhealthy. We can introduce more types in the future.
   - What will autoscaler do when the job is unhealthy? In this PR, our 
strategy is stopping scaling. And we can revert to the last scaling history or 
scale up in the future.
   
   These 2 parts can continue to be improved in the future. About the switch, I 
prefer to disable it in this PR, we can enable some switches when we think it's 
ready for massive production. (At least, disabling memory usage in this PR is 
fine for me, reason can be found from 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423346604)
   
   We don't need to introduce new option to disable it, the `Double.NaN` can be 
used. When it's NaN, we disable the corresponding switch, WDYT?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


gyfora commented on PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1852176197

   I addressed all comments I believe @1996fanrui @mxm .
   Please note that I also added a second commit with the metrics improvements 
separately, not only does this fix the issue highlighted by @1996fanrui but it 
also remove some previously incorrectly reported per vertex metrics (those that 
we did not actually evaluate so were also nan) + adds LAG as an evaluated 
metric for metric completeness (I'm going to add a test for this before merging)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-12 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423752520


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   > we could increase the default threshold to 95% to be on the safe side.
   
   Sounds good to me!
   
   > Regarding the follow up, please go ahead with the ticket :) and feel free 
to work on it!
   
   Thank you, I created FLINK-33801 to follow it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423504221


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   I see your point about the heap usage with a large number of TMs and it 
makes sense. I don't think most jobs would be affected by this issue but we 
could increase the default threshold to 95% to be on the safe side.
   
   Regarding the follow up, please go ahead with the ticket :) and feel free to 
work on it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423346604


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   > Also keep in mind that this is the average heap usage. With 90% average 
usage you are extremely likely to be close to out of heap in most cases.
   
   Thanks @gyfora for the clarification!
   
   I guess it's not average heap usage, and I wanna check with you first. In 
the `ScalingExecutor#isJobUnderMemoryPressure` method, we check whether 
`evaluatedMetrics.get(ScalingMetric.HEAP_USAGE).getAverage()` > 
`conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)`. Intuitively `getAverage` 
looks like the average, but its calculation is divided into two steps:
   - Step1: `ScalingMetrics#computeGlobalMetrics` collect the `HEAP_USAGE` for 
each time, it's `heapUsed.getMax() / heapMax.getMax()`. 
   - IIUC, the `heapUsed` is `AggregatedMetric`, when one job has 1000 
taskmanagers, if the heapUsed for 999 tms is very low, and only one tm is high, 
we think `heapUsed` is high as this time.
   - Step2: `ScalingMetricEvaluator#evaluateGlobalMetrics` compute the 
`HEAP_USAGE` based on `metricHistory`.
   - The `metricHistory` is composed of TMs with the highest heapUsage at a 
large number of time points.
   
   
   Strictly speaking, both of 2 steps have some problems:
   - Step1: Java GC is executed lazily, not immediately.
- When TM heapUsage is high, it may be that the GC has not been 
triggered, which does not mean that the memory pressure is high.
- Especially if the heapUsage is high for only one TM or a small number 
of TMs.
   - Step2: Since the data in the first step is unreliable, the average value 
in the second step is unreliable.
   
   > GC metrics will only be available in Flink 1.19.
   
   I'm not sure can we sum all GC times as the total gc times? Before 1.19, it 
has detailed GC times for each GC.
   
   > This is a very good point and happens often. I think we could definitely 
build this logic on top of the newly introduced metrics + scaling history as a 
follow up. It would probably be a very good addition. (but definitely out of 
scope for this PR)
   
   Sounds make sense, as I understand: it's better to revert this scaling if 
job is unhealthy after scale down. The memory pressure is one type of 
unhealthy. Checkpoint fails or CPU pressure may be unhealthy as well.
   
   Would you mind if I create one JIRA and pick it up? Thanks~
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061343


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -297,6 +301,28 @@ private void computeTargetDataRate(
 }
 }
 
+@VisibleForTesting
+protected static Map 
evaluateGlobalMetrics(
+SortedMap metricHistory) {
+var latest = 
metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
+var out = new HashMap();
+
+var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
+var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN);
+
+out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
+out.put(
+HEAP_USAGE,
+new EvaluatedScalingMetric(
+lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, 
metricHistory)));
+return out;
+}
+
+private static double getAverageGlobalMetric(
+ScalingMetric metric, SortedMap 
metricsHistory) {
+return getAverage(metric, null, metricsHistory);
+}
+
 public static double getAverage(
 ScalingMetric metric,
 JobVertexID jobVertexId,

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061010


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   1. You are right but I still think the current logic is valuable because GC 
metrics will only be available in Flink 1.19. With the heap usage based logic 
we can also support older Flink versions. Also keep in mind that this is the 
average heap usage. With 90% average usage you are extremely likely to be close 
to out of heap in most cases.
   
   2. This is a very good point and happens often. I think we could definitely 
build this logic on top of the newly introduced metrics + scaling history as a 
follow up. It would probably be a very good addition. (but definitely out of 
scope for this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423057412


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +91,87 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+var metricNames = getTmMetricNames(restClient, ctx);
+var metricNameMapping = new HashMap();
+
+REQUIRED_TM_METRICS.forEach(
+fm -> {
+var name =
+fm.findAny(metricNames)
+.orElseThrow(
+() ->
+new RuntimeException(
+"Could not 
find required TM metric "
++ 
fm.name()));
+metricNameMapping.put(name, fm);
+});
+
+TOTAL_GC_TIME_PER_SEC
+.findAny(metricNames)
+.ifPresent(
+m -> {
+LOG.debug("GC metrics found");
+metricNameMapping.put(m, 
TOTAL_GC_TIME_PER_SEC);
+});
+
+var queriedMetrics =
+new HashMap<>(queryAggregatedTmMetrics(restClient, 
metricNameMapping));
+availableTmMetricNames.put(ctx.getJobKey(), metricNames);

Review Comment:
   Actually I just realised that TM metric names are fixed, so we know them 
beforehand and no need to cache it, we can simply hardcode it. I will work on 
this tomorrow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423056535


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -65,7 +65,16 @@ public enum ScalingMetric {
 SCALE_DOWN_RATE_THRESHOLD(false),
 
 /** Expected true processing rate after scale up. */
-EXPECTED_PROCESSING_RATE(false);
+EXPECTED_PROCESSING_RATE(false),
+
+/**
+ * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+ * 0 (no time in GC) and 1 (100% time in GC).
+ */
+GC_PRESSURE(false),
+
+/** Percentage of max heap used (between 0 and 1). */
+HEAP_USAGE(true);

Review Comment:
   good point, I will improve that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


mxm commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422597784


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +91,87 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+var metricNames = getTmMetricNames(restClient, ctx);
+var metricNameMapping = new HashMap();
+
+REQUIRED_TM_METRICS.forEach(
+fm -> {
+var name =
+fm.findAny(metricNames)
+.orElseThrow(
+() ->
+new RuntimeException(
+"Could not 
find required TM metric "
++ 
fm.name()));
+metricNameMapping.put(name, fm);
+});

Review Comment:
   Oh I missed that. Thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422568938


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +91,87 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+var metricNames = getTmMetricNames(restClient, ctx);
+var metricNameMapping = new HashMap();
+
+REQUIRED_TM_METRICS.forEach(
+fm -> {
+var name =
+fm.findAny(metricNames)
+.orElseThrow(
+() ->
+new RuntimeException(
+"Could not 
find required TM metric "
++ 
fm.name()));
+metricNameMapping.put(name, fm);
+});
+
+TOTAL_GC_TIME_PER_SEC
+.findAny(metricNames)
+.ifPresent(
+m -> {
+LOG.debug("GC metrics found");
+metricNameMapping.put(m, 
TOTAL_GC_TIME_PER_SEC);
+});
+
+var queriedMetrics =
+new HashMap<>(queryAggregatedTmMetrics(restClient, 
metricNameMapping));
+availableTmMetricNames.put(ctx.getJobKey(), metricNames);

Review Comment:
   Could we cache `metricNameMapping` directly?
   
   After cached the `metricNames`, the availableTmMetricNames of each job won't 
changed forever. And the  `TOTAL_GC_TIME_PER_SEC` and `REQUIRED_TM_METRICS` 
aren't changed forever. 
   
   So, the `metricNameMapping` won't be changed forever.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422456112


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   I have 2 questions for this autoscaler rule:
   
   1. Does high heap usage indicate insufficient memory?
   -  When the GC is severe, the memory is indeed not enough. 
   - But when the GC time is very low, and the heap usage is high, 
taskManagers might work well, right? (The memory may be just enough). 
   2. Is the insufficient memory caused by rescale down?
   - The GC is fine before rescaling, but the busy ratio is very low, so 
autoscaler scale down this job.
   - But the GC is severe after rescale down.
   - Could we revert this rescaling? Or we think this scale down is 
expected, and users should increase the task memory after scaling down.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422419813


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java:
##
@@ -297,6 +301,28 @@ private void computeTargetDataRate(
 }
 }
 
+@VisibleForTesting
+protected static Map 
evaluateGlobalMetrics(
+SortedMap metricHistory) {
+var latest = 
metricHistory.get(metricHistory.lastKey()).getGlobalMetrics();
+var out = new HashMap();
+
+var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN);
+var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN);
+
+out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure));
+out.put(
+HEAP_USAGE,
+new EvaluatedScalingMetric(
+lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, 
metricHistory)));
+return out;
+}
+
+private static double getAverageGlobalMetric(
+ScalingMetric metric, SortedMap 
metricsHistory) {
+return getAverage(metric, null, metricsHistory);
+}
+
 public static double getAverage(
 ScalingMetric metric,
 JobVertexID jobVertexId,

Review Comment:
   ```suggestion
   @Nullable JobVertexID jobVertexId,
   ```
   
   Could we add the `@Nullable`and add some comments here? When it's null, it's 
`global metrics`.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)

Review Comment:
   I have 2 questions for this autoscaler rule:
   
   1. Does high heap usage indicate insufficient memory?
   -  When the GC is severe, the memory is indeed not enough. 
   - But when the GC time is very low, and the heap usage is high, 
taskManagers might work well, right? (The memory may be just enough). 
   2. Is the insufficient memory caused by rescale down?
   - The GC is fine before rescaling, but the busy ratio is very low, so 
autoscaler scale down this job.
   - But the GC is severe after rescale down.
   - Could we revert this rescaling?
   



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##
@@ -65,7 +65,16 @@ public enum ScalingMetric {
 SCALE_DOWN_RATE_THRESHOLD(false),
 
 /** Expected true processing rate after scale up. */
-EXPECTED_PROCESSING_RATE(false);
+EXPECTED_PROCESSING_RATE(false),
+
+/**
+ * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+ * 0 (no time in GC) and 1 (100% time in GC).
+ */
+GC_PRESSURE(false),
+
+/** Percentage of max heap used (between 0 and 1). */
+HEAP_USAGE(true);

Review Comment:
   I'm not sure should we add a field here to distinguish each metric is vertex 
level or tm level?
   
   We register some metrics at `AutoscalerFlinkMetrics#registerScalingMetrics`, 
and we register all `ScalingMetric` types for all jobVertices even if it's NaN. 
   
   It's obviously we don't need to register Memory related metrics for 
jobVertices.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422338471


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 

Review Comment:
   I could modify the logic to only query the metrics if the threshold is below 
1, in any case TM metric collection issues should not block other autoscaler 
functionality, I have to double check that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


gyfora commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422337175


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +91,87 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+var metricNames = getTmMetricNames(restClient, ctx);
+var metricNameMapping = new HashMap();
+
+REQUIRED_TM_METRICS.forEach(
+fm -> {
+var name =
+fm.findAny(metricNames)
+.orElseThrow(
+() ->
+new RuntimeException(
+"Could not 
find required TM metric "
++ 
fm.name()));
+metricNameMapping.put(name, fm);
+});

Review Comment:
   In the same way as we query the vertex metrics, we only query aggregated 
task manager metrics through the job manager. So it's a single query regardless 
of the number of TMs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-11 Thread via GitHub


mxm commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422277362


##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 

Review Comment:
   I'm wondering whether we should add an on/off switch in case there are any 
issues with the TM metric collection.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##
@@ -78,19 +91,87 @@ protected Map 
queryAggregatedVertexMetrics(
 EmptyRequestBody.getInstance())
 .get();
 
-return responseBody.getMetrics().stream()
-.collect(
-Collectors.toMap(
-m -> metrics.get(m.getId()),
-m -> m,
-(m1, m2) ->
-new AggregatedMetric(
-m1.getId() + " merged with 
" + m2.getId(),
-Math.min(m1.getMin(), 
m2.getMin()),
-Math.max(m1.getMax(), 
m2.getMax()),
-// Average can't be 
computed
-Double.NaN,
-m1.getSum() + 
m2.getSum(;
+return aggregateByFlinkMetric(metrics, responseBody);
 }
 }
+
+protected Map queryTmMetrics(Context ctx) 
throws Exception {
+try (var restClient = ctx.getRestClusterClient()) {
+var metricNames = getTmMetricNames(restClient, ctx);
+var metricNameMapping = new HashMap();
+
+REQUIRED_TM_METRICS.forEach(
+fm -> {
+var name =
+fm.findAny(metricNames)
+.orElseThrow(
+() ->
+new RuntimeException(
+"Could not 
find required TM metric "
++ 
fm.name()));
+metricNameMapping.put(name, fm);
+});

Review Comment:
   I'm a little bit concerned these non-parallel requests might take too long 
for deployments with hundreds of task managers. Especially because this feature 
cannot be turned off.



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+.doubleType()
+.defaultValue(0.3)
+.withDescription("Max allowed GC pressure during scaling 
operations");
+
+public static final ConfigOption HEAP_USAGE_THRESHOLD =
+autoScalerConfig("memory.heap-usage.threshold")
+.doubleType()
+.defaultValue(0.9)
+.withDescription("Max allowed Heap usage during scaling 
operations");

Review Comment:
   ```suggestion
   .withDescription("Max allowed percentage of heap usage 
during scaling operations. Autoscaling will be paused if the heap usage exceeds 
this threshold.");
   ```



##
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##
@@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
 .withDescription(
 "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+public static final ConfigOption GC_PRESSURE_THRESHOLD =
+autoScalerConfig("memory.gc-pressure.threshold")
+  

[PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]

2023-12-08 Thread via GitHub


gyfora opened a new pull request, #726:
URL: https://github.com/apache/flink-kubernetes-operator/pull/726

   ## What is the purpose of the change
   
   The autoscaler currently doesn't use any GC/HEAP metrics as part of the 
scaling decisions.
   While the long term goal may be to support vertical scaling (increasing TM 
sizes) currently this is out of scope for the autoscaler.
   
   However it is very important to detect cases where the throughput of certain 
vertices or the entire pipeline is critically affected by long GC pauses. In 
these cases the current autoscaler logic would wrongly assume a low true 
processing rate and scale the pipeline too high, ramping up costs and causing 
further issues.
   
   Using the improved GC metrics introduced in 
https://issues.apache.org/jira/browse/FLINK-33318 we should measure the GC 
pauses and simply block scaling decisions if the pipeline spends too much time 
garbage collecting and notify the user about the required action to increase 
memory.
   
   *This feature requires Flink 1.19 or the commit back ported to earlier 
versions*
   
   ## Brief change log
   
 - *Introduce TM level metrics for the autoscaler and track HEAP/GC usage*
 - *Trigger event and block scaling if gc is above threshold*
 - *Tests*
   
   ## Verifying this change
   
   Unit tests + manual validation in various envs.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs [TODO]
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org