1996fanrui commented on code in PR #726:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/726#discussion_r1426576655


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -219,6 +219,22 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Processing rate increase threshold for detecting 
ineffective scaling threshold. 0.1 means if we do not accomplish at least 10% 
of the desired capacity increase with scaling, the action is marked 
ineffective.");
 
+    public static final ConfigOption<Double> GC_PRESSURE_THRESHOLD =
+            autoScalerConfig("memory.gc-pressure.threshold")
+                    .doubleType()
+                    .defaultValue(0.3)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.gc-pressure.threshold"))
+                    .withDescription(
+                            "Max allowed GC pressure (percentage spent garbage 
collecting) during scaling operations. Autoscaling will be paused if the GC 
pressure exceeds this limit.");
+
+    public static final ConfigOption<Double> HEAP_USAGE_THRESHOLD =
+            autoScalerConfig("memory.heap-usage.threshold")
+                    .doubleType()
+                    .defaultValue(1.)
+                    
.withFallbackKeys(oldOperatorConfigKey("memory.heap-usage.threshold"))

Review Comment:
   `oldOperatorConfigKey` is not needed, right?
   
   `GC_PRESSURE_THRESHOLD` is same as well.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java:
##########
@@ -65,15 +69,35 @@ public enum ScalingMetric {
     SCALE_DOWN_RATE_THRESHOLD(false),
 
     /** Expected true processing rate after scale up. */
-    EXPECTED_PROCESSING_RATE(false);
+    EXPECTED_PROCESSING_RATE(false),
 
-    private final boolean calculateAverage;
+    /**
+     * Maximum GC pressure across taskmanagers. Percentage of time spent 
garbage collecting between
+     * 0 (no time in GC) and 1 (100% time in GC).
+     */
+    GC_PRESSURE(false),
+
+    /** Percentage of max heap used (between 0 and 1). */
+    HEAP_USAGE(true);
+
+    @Getter private final boolean calculateAverage;
+
+    /** List of {@link ScalingMetric}s to be reported as per vertex Flink 
metrics. */
+    public static final List<ScalingMetric> REPORTED_VERTEX_METRICS =
+            List.of(
+                    LOAD,
+                    TRUE_PROCESSING_RATE,
+                    TARGET_DATA_RATE,
+                    CATCH_UP_DATA_RATE,
+                    LAG,
+                    PARALLELISM,
+                    RECOMMENDED_PARALLELISM,
+                    MAX_PARALLELISM,
+                    SCALE_UP_RATE_THRESHOLD,
+                    SCALE_DOWN_RATE_THRESHOLD,
+                    EXPECTED_PROCESSING_RATE);

Review Comment:
   > I have added a test to compare the list of evaluated metrics to this to 
help developers when adding new metrics.
   
   Sorry, could you please clarify which test will fail when one new metric 
should be added to `REPORTED_VERTEX_METRICS` set but developers forget it? I 
didn't find it.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/RestApiMetricsCollector.java:
##########
@@ -78,19 +99,89 @@ protected Map<FlinkMetric, AggregatedMetric> 
queryAggregatedVertexMetrics(
                                     EmptyRequestBody.getInstance())
                             .get();
 
-            return responseBody.getMetrics().stream()
-                    .collect(
-                            Collectors.toMap(
-                                    m -> metrics.get(m.getId()),
-                                    m -> m,
-                                    (m1, m2) ->
-                                            new AggregatedMetric(
-                                                    m1.getId() + " merged with 
" + m2.getId(),
-                                                    Math.min(m1.getMin(), 
m2.getMin()),
-                                                    Math.max(m1.getMax(), 
m2.getMax()),
-                                                    // Average can't be 
computed
-                                                    Double.NaN,
-                                                    m1.getSum() + 
m2.getSum())));
+            return aggregateByFlinkMetric(metrics, responseBody);
         }
     }
+
+    protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context ctx) 
throws Exception {
+        try (var restClient = ctx.getRestClusterClient()) {
+            boolean hasGcMetrics =
+                    jobsWithGcMetrics.computeIfAbsent(
+                            ctx.getJobKey(),
+                            k -> {
+                                boolean gcMetricsFound =
+                                        !queryAggregatedTmMetrics(
+                                                        restClient, 
TM_METRIC_NAMES_WITH_GC)
+                                                .isEmpty();

Review Comment:
   Sorry, I don't understand this code logic.
   
   The code is that when the `Heap.Max` and `Heap.Used` are found, but 
`GarbageCollector.All.TimeMsPerSecond` isn't found. The `hasGcMetrics` and 
`gcMetricsFound` will be true, and we will query 
`GarbageCollector.All.TimeMsPerSecond` in the future, right?
   
   Please correct me if my understanding is wrong.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to