Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora merged PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
mxm commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1857974906 Thanks for the ping. Looks good! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1857868649 Thanks @gyfora for the ping. I don't have any comments, and I have approved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1857864814 Do you have any further comments / concerns @1996fanrui @mxm ? The thresholds have been adjusted so by default we do not ever block, and we plan to iterate and refine the behaviour based on these metrics in the upcoming improvements to make the behaviour a bit more graceful :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1427474826 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) + .withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold")) +.withDescription( +"Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit."); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold")) Review Comment: Thanks for the clarification! Let's respect this rule in the future~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
mxm commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426749088 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) + .withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold")) +.withDescription( +"Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit."); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold")) Review Comment: I agree, we should support the old-style config namespace until the next major release. The overhead of this is very low and our users do not have to worry about using the right prefix for their configuration. As things stand, even if we removed the old namespace, we would have to add a mapping in the operator (either public our in our fork) because it is not feasible to go to every user to ask them to change their configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426736930 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) + .withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold")) +.withDescription( +"Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit."); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold")) Review Comment: After a brief discussion with @mxm offline, we feel that it's easiest and harmless to keep the fallback keys consistent for all options. We may be able to remove these at a future point (a new major version of the operator) but it's simply better to have a consistent fallback syntax for operator users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426689038 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java: ## @@ -188,6 +187,13 @@ public void testEndToEnd() throws Exception { assertNotNull(metricsCollector.getHistories().get(context.getJobKey())); +// Make sure all reported vertex metrics are evaluated, we expect complete metrics when a +// vertex is actually scaled +// Also for sources we have LAG metrics that is not available for other vertices +assertEquals( +ScalingMetric.REPORTED_VERTEX_METRICS, +evaluation.getVertexMetrics().get(source1).keySet()); Review Comment: Thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426670350 ## flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java: ## @@ -188,6 +187,13 @@ public void testEndToEnd() throws Exception { assertNotNull(metricsCollector.getHistories().get(context.getJobKey())); +// Make sure all reported vertex metrics are evaluated, we expect complete metrics when a +// vertex is actually scaled +// Also for sources we have LAG metrics that is not available for other vertices +assertEquals( +ScalingMetric.REPORTED_VERTEX_METRICS, +evaluation.getVertexMetrics().get(source1).keySet()); Review Comment: @1996fanrui this is the added test for metric reporting / evaluation completeness -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426669381 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) + .withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold")) +.withDescription( +"Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit."); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold")) Review Comment: true, removing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426576655 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) + .withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold")) +.withDescription( +"Max allowed GC pressure (percentage spent garbage collecting) during scaling operations. Autoscaling will be paused if the GC pressure exceeds this limit."); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(1.) + .withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold")) Review Comment: `oldOperatorConfigKey` is not needed, right? `GC_PRESSURE_THRESHOLD` is same as well. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -65,15 +69,35 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ -EXPECTED_PROCESSING_RATE(false); +EXPECTED_PROCESSING_RATE(false), -private final boolean calculateAverage; +/** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ +GC_PRESSURE(false), + +/** Percentage of max heap used (between 0 and 1). */ +HEAP_USAGE(true); + +@Getter private final boolean calculateAverage; + +/** List of {@link ScalingMetric}s to be reported as per vertex Flink metrics. */ +public static final List REPORTED_VERTEX_METRICS = +List.of( +LOAD, +TRUE_PROCESSING_RATE, +TARGET_DATA_RATE, +CATCH_UP_DATA_RATE, +LAG, +PARALLELISM, +RECOMMENDED_PARALLELISM, +MAX_PARALLELISM, +SCALE_UP_RATE_THRESHOLD, +SCALE_DOWN_RATE_THRESHOLD, +EXPECTED_PROCESSING_RATE); Review Comment: > I have added a test to compare the list of evaluated metrics to this to help developers when adding new metrics. Sorry, could you please clarify which test will fail when one new metric should be added to `REPORTED_VERTEX_METRICS` set but developers forget it? I didn't find it. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +99,89 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +boolean hasGcMetrics = +jobsWithGcMetrics.computeIfAbsent( +ctx.getJobKey(), +k -> { +boolean gcMetricsFound = +!queryAggregatedTmMetrics( +restClient, TM_METRIC_NAMES_WITH_GC) +
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1855635579 Fair point @mxm , I will adjust the limits to effectively disable the blocking by default. And we can build some more robust mechanisms on top of these collected TM level metrics soon. I think @1996fanrui alreqdy started brainstorming around this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
mxm commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1855592264 >However I am personally very much in favour of enabling it at least for the GC time for the following reason: >When you spend a considerable time in GC (30% is actually a lot) the processing is probably extremely slow compared to a state where you have normal gc time. In my experience when you are above 30% GC you are usually way above it and the degradation is massive, in most of these cases processing comes to almost a complete halt relatively speaking. >This means that true processing rate and other time based measurements are completely off (very low compared to the actual value without memory pressure). So the scale up in this case would be very much overshooting, risking a large resource / cost spike that is basically 100% wrong. I think this is a big production risk that can affect trust in the autoscaler. I fully understand the rational for blocking scaling decisions but I think I draw different conclusions from these scenarios. Consider the following scenario: We scaled down to parallelism 1 at night because there was no traffic. In the morning, either immediately, or as traffic ramps up and we scale up gradually, we get trapped by the GC feature. I would prefer overscaling as opposed to getting stuck. The scaling would be capped in resources either by the existing max parallelism setting or by an upcoming fairness feature which will allow allocating only a fraction of the max cluster resources. I believe this PR is part of a solution to handle GC pressure / heap memory issues, but I'm not convinced blocking scaling is the behavior I would like to see. Ultimately, memoization of the processing capacity, processing rate, and GC pressure at each parallelism would help to build a simple model of what kind of scaling decisions to prevent. For example, if we end up in a high GC scenario under a certain input rate, we would block scaling to that parallelism (or a close parallelism). I feel quite strong about never blocking scaling because this is the worst kind of outcome for the user. I'm ok with merging this feature but I would make sure it is disabled for us until we address the above concerns. Generally, I wonder whether it makes sense to go back to the drawing board for these kind of impactful features. It is good to have discussions upfront because the implementation can be a lot of work. Again, nothing wrong with this change, thank you for making it possible, but I feel like it needs additional work to deliver the full potential value to users. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425246326 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +99,89 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +boolean hasGcMetrics = +jobsWithGcMetrics.computeIfAbsent( +ctx.getJobKey(), +k -> { +boolean gcMetricsFound = +!queryAggregatedTmMetrics( +restClient, TM_METRIC_NAMES_WITH_GC) +.isEmpty(); Review Comment: That’s not how this works unfortunately, if any of the metrics is not found an empty response comes back from Flink ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +99,89 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +boolean hasGcMetrics = +jobsWithGcMetrics.computeIfAbsent( +ctx.getJobKey(), +k -> { +boolean gcMetricsFound = +!queryAggregatedTmMetrics( +restClient, TM_METRIC_NAMES_WITH_GC) +.isEmpty(); Review Comment: That’s not how this works unfortunately, if any of the metrics is not found an empty response comes back from Flink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425233692 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +99,89 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +boolean hasGcMetrics = +jobsWithGcMetrics.computeIfAbsent( +ctx.getJobKey(), +k -> { +boolean gcMetricsFound = +!queryAggregatedTmMetrics( +restClient, TM_METRIC_NAMES_WITH_GC) +.isEmpty(); Review Comment: When the `Heap.Max` and `Heap.Used` are found, the `gcMetricsFound` is ture. Is it expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425233692 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +99,89 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +boolean hasGcMetrics = +jobsWithGcMetrics.computeIfAbsent( +ctx.getJobKey(), +k -> { +boolean gcMetricsFound = +!queryAggregatedTmMetrics( +restClient, TM_METRIC_NAMES_WITH_GC) +.isEmpty(); Review Comment: When the `Heap.Max` and `Heap.Used` are found, we think `gcMetricsFound` is ture. Is it expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1853576202 > I worry a bit about the implications of enabling this feature by default (also see [#726 (comment)](https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422277362)). Can we put this behind a flag? > > I feel like the solution to this problem is more complex, but the current code is definitely part of the solution. I'm not sure stopping to scale on GC pressure / heap usage is always desirable. In a lot of scenarios, scaling up might resolve GC pressure / heap usage issues. But after merging this PR, we might get stuck in a bad state. Users do not typically monitor their pipelines events that closely. > > If we discover heap / GC pressure, it looks like we want to let the user know, scale up to solve the issue, and block scaling to a lower parallelism. Not allowing scaling might actually make the situation worse. @1996fanrui @mxm I don't really want to add more on/off switches but we can consider setting the default limit larger or to 1 (effectively disabling this feature) initially if we think this needs more iteration. However I am personally very much in favour of enabling it at least for the GC time for the following reason: When you spend a considerable time in GC (30% is actually a lot) the processing is probably extremely slow compared to a state where you have normal gc time. In my experience when you are above 30% GC you are usually way above it and the degradation is massive, in most of these cases processing comes to almost a complete halt relatively speaking. This means that true processing time and other time based measurements are completely off (very low compared to the actual value without memory pressure). So the scale up in this case would be very much overshooting, risking a large resource / cost spike that is basically 100% wrong. I think this is a big production risk that can affect trust in the autoscaler. So I see 3 options here: 1. Keep the proposed 30% threshold 2. Increase the threshold to let's say 50% to be on the slightly safer side 3. Disable the check by default I am personally +1 on 1 or 2 with a slight preference towards 1. I agree with @1996fanrui s concern for the more simplistic heap check should be turned off by default. (threshold to 1) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1425082293 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -65,15 +69,35 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ -EXPECTED_PROCESSING_RATE(false); +EXPECTED_PROCESSING_RATE(false), -private final boolean calculateAverage; +/** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ +GC_PRESSURE(false), + +/** Percentage of max heap used (between 0 and 1). */ +HEAP_USAGE(true); + +@Getter private final boolean calculateAverage; + +/** List of {@link ScalingMetric}s to be reported as per vertex Flink metrics. */ +public static final List REPORTED_VERTEX_METRICS = +List.of( +LOAD, +TRUE_PROCESSING_RATE, +TARGET_DATA_RATE, +CATCH_UP_DATA_RATE, +LAG, +PARALLELISM, +RECOMMENDED_PARALLELISM, +MAX_PARALLELISM, +SCALE_UP_RATE_THRESHOLD, +SCALE_DOWN_RATE_THRESHOLD, +EXPECTED_PROCESSING_RATE); Review Comment: I have added a test to compare the list of evaluated metrics to this to help developers when adding new metrics. Initially I tried the enum based approach but it just makes the ScalingMetric enum more complicated without any practical benefit. We still have to make sure independently that these metrics are evaluated correctly for reporting. Will push this in a second -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1424853109 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -65,15 +69,35 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ -EXPECTED_PROCESSING_RATE(false); +EXPECTED_PROCESSING_RATE(false), -private final boolean calculateAverage; +/** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ +GC_PRESSURE(false), + +/** Percentage of max heap used (between 0 and 1). */ +HEAP_USAGE(true); + +@Getter private final boolean calculateAverage; + +/** List of {@link ScalingMetric}s to be reported as per vertex Flink metrics. */ +public static final List REPORTED_VERTEX_METRICS = +List.of( +LOAD, +TRUE_PROCESSING_RATE, +TARGET_DATA_RATE, +CATCH_UP_DATA_RATE, +LAG, +PARALLELISM, +RECOMMENDED_PARALLELISM, +MAX_PARALLELISM, +SCALE_UP_RATE_THRESHOLD, +SCALE_DOWN_RATE_THRESHOLD, +EXPECTED_PROCESSING_RATE); Review Comment: nit: Could we add one filed in this enum to indicate should each enum report vertex metrics? Add a list can work, but other developers or we will forget it when adding one new enum in the future. If we add one field, every one cannot forget it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1853275312 > I feel like the solution to this problem is more complex, but the current code is definitely part of the solution. I'm not sure stopping to scale on GC pressure / heap usage is always desirable. In a lot of scenarios, scaling up might resolve GC pressure / heap usage issues. But after merging this PR, we might get stuck in a bad state. Users do not typically monitor their pipelines events that closely. > > If we discover heap / GC pressure, it looks like we want to let the user know, scale up to solve the issue, and block scaling to a lower parallelism. Not allowing scaling might actually make the situation worse. Sounds make sense! Could we extract a common logic related to unhealthy? And unhealthy includes 2 parts: - How to check one job is unhealthy? Such as: GC severe or memory high is some type of unhealthy. We can introduce more types in the future. - What will autoscaler do when the job is unhealthy? In this PR, our strategy is stopping scaling. And we can revert to the last scaling history or scale up in the future. These 2 parts can continue to be improved in the future. About the switch, I prefer to disable it in this PR, we can enable some switches when we think it's ready for massive production. (At least, disabling memory usage in this PR is fine for me, reason can be found from https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423346604) We don't need to introduce new option to disable it, the `Double.NaN` can be used. When it's NaN, we disable the corresponding switch, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#issuecomment-1852176197 I addressed all comments I believe @1996fanrui @mxm . Please note that I also added a second commit with the metrics improvements separately, not only does this fix the issue highlighted by @1996fanrui but it also remove some previously incorrectly reported per vertex metrics (those that we did not actually evaluate so were also nan) + adds LAG as an evaluated metric for metric completeness (I'm going to add a test for this before merging) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423752520 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: > we could increase the default threshold to 95% to be on the safe side. Sounds good to me! > Regarding the follow up, please go ahead with the ticket :) and feel free to work on it! Thank you, I created FLINK-33801 to follow it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423504221 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: I see your point about the heap usage with a large number of TMs and it makes sense. I don't think most jobs would be affected by this issue but we could increase the default threshold to 95% to be on the safe side. Regarding the follow up, please go ahead with the ticket :) and feel free to work on it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423346604 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: > Also keep in mind that this is the average heap usage. With 90% average usage you are extremely likely to be close to out of heap in most cases. Thanks @gyfora for the clarification! I guess it's not average heap usage, and I wanna check with you first. In the `ScalingExecutor#isJobUnderMemoryPressure` method, we check whether `evaluatedMetrics.get(ScalingMetric.HEAP_USAGE).getAverage()` > `conf.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)`. Intuitively `getAverage` looks like the average, but its calculation is divided into two steps: - Step1: `ScalingMetrics#computeGlobalMetrics` collect the `HEAP_USAGE` for each time, it's `heapUsed.getMax() / heapMax.getMax()`. - IIUC, the `heapUsed` is `AggregatedMetric`, when one job has 1000 taskmanagers, if the heapUsed for 999 tms is very low, and only one tm is high, we think `heapUsed` is high as this time. - Step2: `ScalingMetricEvaluator#evaluateGlobalMetrics` compute the `HEAP_USAGE` based on `metricHistory`. - The `metricHistory` is composed of TMs with the highest heapUsage at a large number of time points. Strictly speaking, both of 2 steps have some problems: - Step1: Java GC is executed lazily, not immediately. - When TM heapUsage is high, it may be that the GC has not been triggered, which does not mean that the memory pressure is high. - Especially if the heapUsage is high for only one TM or a small number of TMs. - Step2: Since the data in the first step is unreliable, the average value in the second step is unreliable. > GC metrics will only be available in Flink 1.19. I'm not sure can we sum all GC times as the total gc times? Before 1.19, it has detailed GC times for each GC. > This is a very good point and happens often. I think we could definitely build this logic on top of the newly introduced metrics + scaling history as a follow up. It would probably be a very good addition. (but definitely out of scope for this PR) Sounds make sense, as I understand: it's better to revert this scaling if job is unhealthy after scale down. The memory pressure is one type of unhealthy. Checkpoint fails or CPU pressure may be unhealthy as well. Would you mind if I create one JIRA and pick it up? Thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061343 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -297,6 +301,28 @@ private void computeTargetDataRate( } } +@VisibleForTesting +protected static Map evaluateGlobalMetrics( +SortedMap metricHistory) { +var latest = metricHistory.get(metricHistory.lastKey()).getGlobalMetrics(); +var out = new HashMap(); + +var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN); +var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN); + +out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure)); +out.put( +HEAP_USAGE, +new EvaluatedScalingMetric( +lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, metricHistory))); +return out; +} + +private static double getAverageGlobalMetric( +ScalingMetric metric, SortedMap metricsHistory) { +return getAverage(metric, null, metricsHistory); +} + public static double getAverage( ScalingMetric metric, JobVertexID jobVertexId, Review Comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423061010 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: 1. You are right but I still think the current logic is valuable because GC metrics will only be available in Flink 1.19. With the heap usage based logic we can also support older Flink versions. Also keep in mind that this is the average heap usage. With 90% average usage you are extremely likely to be close to out of heap in most cases. 2. This is a very good point and happens often. I think we could definitely build this logic on top of the newly introduced metrics + scaling history as a follow up. It would probably be a very good addition. (but definitely out of scope for this PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423057412 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +91,87 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +var metricNames = getTmMetricNames(restClient, ctx); +var metricNameMapping = new HashMap(); + +REQUIRED_TM_METRICS.forEach( +fm -> { +var name = +fm.findAny(metricNames) +.orElseThrow( +() -> +new RuntimeException( +"Could not find required TM metric " ++ fm.name())); +metricNameMapping.put(name, fm); +}); + +TOTAL_GC_TIME_PER_SEC +.findAny(metricNames) +.ifPresent( +m -> { +LOG.debug("GC metrics found"); +metricNameMapping.put(m, TOTAL_GC_TIME_PER_SEC); +}); + +var queriedMetrics = +new HashMap<>(queryAggregatedTmMetrics(restClient, metricNameMapping)); +availableTmMetricNames.put(ctx.getJobKey(), metricNames); Review Comment: Actually I just realised that TM metric names are fixed, so we know them beforehand and no need to cache it, we can simply hardcode it. I will work on this tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1423056535 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -65,7 +65,16 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ -EXPECTED_PROCESSING_RATE(false); +EXPECTED_PROCESSING_RATE(false), + +/** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ +GC_PRESSURE(false), + +/** Percentage of max heap used (between 0 and 1). */ +HEAP_USAGE(true); Review Comment: good point, I will improve that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
mxm commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422597784 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +91,87 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +var metricNames = getTmMetricNames(restClient, ctx); +var metricNameMapping = new HashMap(); + +REQUIRED_TM_METRICS.forEach( +fm -> { +var name = +fm.findAny(metricNames) +.orElseThrow( +() -> +new RuntimeException( +"Could not find required TM metric " ++ fm.name())); +metricNameMapping.put(name, fm); +}); Review Comment: Oh I missed that. Thanks for clarifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422568938 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +91,87 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +var metricNames = getTmMetricNames(restClient, ctx); +var metricNameMapping = new HashMap(); + +REQUIRED_TM_METRICS.forEach( +fm -> { +var name = +fm.findAny(metricNames) +.orElseThrow( +() -> +new RuntimeException( +"Could not find required TM metric " ++ fm.name())); +metricNameMapping.put(name, fm); +}); + +TOTAL_GC_TIME_PER_SEC +.findAny(metricNames) +.ifPresent( +m -> { +LOG.debug("GC metrics found"); +metricNameMapping.put(m, TOTAL_GC_TIME_PER_SEC); +}); + +var queriedMetrics = +new HashMap<>(queryAggregatedTmMetrics(restClient, metricNameMapping)); +availableTmMetricNames.put(ctx.getJobKey(), metricNames); Review Comment: Could we cache `metricNameMapping` directly? After cached the `metricNames`, the availableTmMetricNames of each job won't changed forever. And the `TOTAL_GC_TIME_PER_SEC` and `REQUIRED_TM_METRICS` aren't changed forever. So, the `metricNameMapping` won't be changed forever. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422456112 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: I have 2 questions for this autoscaler rule: 1. Does high heap usage indicate insufficient memory? - When the GC is severe, the memory is indeed not enough. - But when the GC time is very low, and the heap usage is high, taskManagers might work well, right? (The memory may be just enough). 2. Is the insufficient memory caused by rescale down? - The GC is fine before rescaling, but the busy ratio is very low, so autoscaler scale down this job. - But the GC is severe after rescale down. - Could we revert this rescaling? Or we think this scale down is expected, and users should increase the task memory after scaling down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
1996fanrui commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422419813 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java: ## @@ -297,6 +301,28 @@ private void computeTargetDataRate( } } +@VisibleForTesting +protected static Map evaluateGlobalMetrics( +SortedMap metricHistory) { +var latest = metricHistory.get(metricHistory.lastKey()).getGlobalMetrics(); +var out = new HashMap(); + +var gcPressure = latest.getOrDefault(GC_PRESSURE, Double.NaN); +var lastHeapUsage = latest.getOrDefault(HEAP_USAGE, Double.NaN); + +out.put(GC_PRESSURE, EvaluatedScalingMetric.of(gcPressure)); +out.put( +HEAP_USAGE, +new EvaluatedScalingMetric( +lastHeapUsage, getAverageGlobalMetric(HEAP_USAGE, metricHistory))); +return out; +} + +private static double getAverageGlobalMetric( +ScalingMetric metric, SortedMap metricsHistory) { +return getAverage(metric, null, metricsHistory); +} + public static double getAverage( ScalingMetric metric, JobVertexID jobVertexId, Review Comment: ```suggestion @Nullable JobVertexID jobVertexId, ``` Could we add the `@Nullable`and add some comments here? When it's null, it's `global metrics`. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) Review Comment: I have 2 questions for this autoscaler rule: 1. Does high heap usage indicate insufficient memory? - When the GC is severe, the memory is indeed not enough. - But when the GC time is very low, and the heap usage is high, taskManagers might work well, right? (The memory may be just enough). 2. Is the insufficient memory caused by rescale down? - The GC is fine before rescaling, but the busy ratio is very low, so autoscaler scale down this job. - But the GC is severe after rescale down. - Could we revert this rescaling? ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java: ## @@ -65,7 +65,16 @@ public enum ScalingMetric { SCALE_DOWN_RATE_THRESHOLD(false), /** Expected true processing rate after scale up. */ -EXPECTED_PROCESSING_RATE(false); +EXPECTED_PROCESSING_RATE(false), + +/** + * Maximum GC pressure across taskmanagers. Percentage of time spent garbage collecting between + * 0 (no time in GC) and 1 (100% time in GC). + */ +GC_PRESSURE(false), + +/** Percentage of max heap used (between 0 and 1). */ +HEAP_USAGE(true); Review Comment: I'm not sure should we add a field here to distinguish each metric is vertex level or tm level? We register some metrics at `AutoscalerFlinkMetrics#registerScalingMetrics`, and we register all `ScalingMetric` types for all jobVertices even if it's NaN. It's obviously we don't need to register Memory related metrics for jobVertices. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422338471 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); Review Comment: I could modify the logic to only query the metrics if the threshold is below 1, in any case TM metric collection issues should not block other autoscaler functionality, I have to double check that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422337175 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +91,87 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +var metricNames = getTmMetricNames(restClient, ctx); +var metricNameMapping = new HashMap(); + +REQUIRED_TM_METRICS.forEach( +fm -> { +var name = +fm.findAny(metricNames) +.orElseThrow( +() -> +new RuntimeException( +"Could not find required TM metric " ++ fm.name())); +metricNameMapping.put(name, fm); +}); Review Comment: In the same way as we query the vertex metrics, we only query aggregated task manager metrics through the job manager. So it's a single query regardless of the number of TMs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
mxm commented on code in PR #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1422277362 ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); Review Comment: I'm wondering whether we should add an on/off switch in case there are any issues with the TM metric collection. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java: ## @@ -78,19 +91,87 @@ protected Map queryAggregatedVertexMetrics( EmptyRequestBody.getInstance()) .get(); -return responseBody.getMetrics().stream() -.collect( -Collectors.toMap( -m -> metrics.get(m.getId()), -m -> m, -(m1, m2) -> -new AggregatedMetric( -m1.getId() + " merged with " + m2.getId(), -Math.min(m1.getMin(), m2.getMin()), -Math.max(m1.getMax(), m2.getMax()), -// Average can't be computed -Double.NaN, -m1.getSum() + m2.getSum(; +return aggregateByFlinkMetric(metrics, responseBody); } } + +protected Map queryTmMetrics(Context ctx) throws Exception { +try (var restClient = ctx.getRestClusterClient()) { +var metricNames = getTmMetricNames(restClient, ctx); +var metricNameMapping = new HashMap(); + +REQUIRED_TM_METRICS.forEach( +fm -> { +var name = +fm.findAny(metricNames) +.orElseThrow( +() -> +new RuntimeException( +"Could not find required TM metric " ++ fm.name())); +metricNameMapping.put(name, fm); +}); Review Comment: I'm a little bit concerned these non-parallel requests might take too long for deployments with hundreds of task managers. Especially because this feature cannot be turned off. ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +.doubleType() +.defaultValue(0.3) +.withDescription("Max allowed GC pressure during scaling operations"); + +public static final ConfigOption HEAP_USAGE_THRESHOLD = +autoScalerConfig("memory.heap-usage.threshold") +.doubleType() +.defaultValue(0.9) +.withDescription("Max allowed Heap usage during scaling operations"); Review Comment: ```suggestion .withDescription("Max allowed percentage of heap usage during scaling operations. Autoscaling will be paused if the heap usage exceeds this threshold."); ``` ## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java: ## @@ -222,6 +222,18 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) { .withDescription( "Processing rate increase threshold for detecting ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% of the desired capacity increase with scaling, the action is marked ineffective."); +public static final ConfigOption GC_PRESSURE_THRESHOLD = +autoScalerConfig("memory.gc-pressure.threshold") +
[PR] [FLINK-33764] Track Heap usage and GC pressure to avoid unnecessary scaling [flink-kubernetes-operator]
gyfora opened a new pull request, #726: URL: https://github.com/apache/flink-kubernetes-operator/pull/726 ## What is the purpose of the change The autoscaler currently doesn't use any GC/HEAP metrics as part of the scaling decisions. While the long term goal may be to support vertical scaling (increasing TM sizes) currently this is out of scope for the autoscaler. However it is very important to detect cases where the throughput of certain vertices or the entire pipeline is critically affected by long GC pauses. In these cases the current autoscaler logic would wrongly assume a low true processing rate and scale the pipeline too high, ramping up costs and causing further issues. Using the improved GC metrics introduced in https://issues.apache.org/jira/browse/FLINK-33318 we should measure the GC pauses and simply block scaling decisions if the pipeline spends too much time garbage collecting and notify the user about the required action to increase memory. *This feature requires Flink 1.19 or the commit back ported to earlier versions* ## Brief change log - *Introduce TM level metrics for the autoscaler and track HEAP/GC usage* - *Trigger event and block scaling if gc is above threshold* - *Tests* ## Verifying this change Unit tests + manual validation in various envs. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs [TODO] -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org