This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ecc15a36 [FLINK-33771] Extract method to estimate task slots and add 
dedicated test (#773)
ecc15a36 is described below

commit ecc15a365714b50810c48270b9a192a3ec26a323
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Wed Feb 7 11:38:57 2024 +0100

    [FLINK-33771] Extract method to estimate task slots and add dedicated test 
(#773)
---
 .../apache/flink/autoscaler/ScalingExecutor.java   | 38 ++---------
 .../flink/autoscaler/utils/ResourceCheckUtils.java | 78 ++++++++++++++++++++++
 .../autoscaler/utils/ResourceCheckUtilsTest.java   | 76 +++++++++++++++++++++
 3 files changed, 160 insertions(+), 32 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index d85b311e..094d4680 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.autoscaler.resources.NoopResourceCheck;
 import org.apache.flink.autoscaler.resources.ResourceCheck;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
 import org.apache.flink.autoscaler.utils.CalendarUtils;
+import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -278,38 +279,11 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             return true;
         }
 
-        var vertexMetrics = evaluatedMetrics.getVertexMetrics();
-
-        int oldParallelismSum =
-                vertexMetrics.values().stream()
-                        .map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
-                        .reduce(0, Integer::sum);
-
-        Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
-        for (Map.Entry<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>> entry :
-                vertexMetrics.entrySet()) {
-            JobVertexID jobVertexID = entry.getKey();
-            ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
-            if (scalingSummary != null) {
-                newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
-            } else {
-                newParallelisms.put(
-                        jobVertexID,
-                        (int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
-            }
-        }
-
-        double numTaskSlotsUsed = 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
-
-        final int numTaskSlotsAfterRescale;
-        if (oldParallelismSum >= numTaskSlotsUsed) {
-            // Slot sharing is (partially) deactivated,
-            // assuming no slot sharing in absence of additional data.
-            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::sum);
-        } else {
-            // Slot sharing is activated
-            numTaskSlotsAfterRescale = 
newParallelisms.values().stream().reduce(0, Integer::max);
-        }
+        int numTaskSlotsUsed =
+                (int) 
globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
+        final int numTaskSlotsAfterRescale =
+                ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                        evaluatedMetrics.getVertexMetrics(), scalingSummaries, 
numTaskSlotsUsed);
 
         int taskSlotsPerTm = 
ctx.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS);
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
new file mode 100644
index 00000000..18bf58c0
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Utils methods for resource checks. */
+public class ResourceCheckUtils {
+
+    public static int estimateNumTaskSlotsAfterRescale(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
vertexMetrics,
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            int numTaskSlotsUsed) {
+
+        Map<JobVertexID, Integer> newParallelisms =
+                computeNewParallelisms(scalingSummaries, vertexMetrics);
+
+        if (currentMaxParallelism(vertexMetrics) == numTaskSlotsUsed) {
+            // Slot sharing is activated
+            return newParallelisms.values().stream().reduce(0, Integer::max);
+        } else {
+            // Slot sharing is (partially) deactivated,
+            // assuming no slot sharing in absence of additional metrics.
+            return newParallelisms.values().stream().reduce(0, Integer::sum);
+        }
+    }
+
+    private static Map<JobVertexID, Integer> computeNewParallelisms(
+            Map<JobVertexID, ScalingSummary> scalingSummaries,
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
vertexMetrics) {
+
+        Map<JobVertexID, Integer> newParallelisms = new HashMap<>();
+
+        for (Map.Entry<JobVertexID, Map<ScalingMetric, 
EvaluatedScalingMetric>> entry :
+                vertexMetrics.entrySet()) {
+            JobVertexID jobVertexID = entry.getKey();
+            ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID);
+            if (scalingSummary != null) {
+                newParallelisms.put(jobVertexID, 
scalingSummary.getNewParallelism());
+            } else {
+                newParallelisms.put(
+                        jobVertexID,
+                        (int) 
entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent());
+            }
+        }
+
+        return newParallelisms;
+    }
+
+    private static int currentMaxParallelism(
+            Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
vertexMetrics) {
+
+        return vertexMetrics.values().stream()
+                .map(map -> (int) 
map.get(ScalingMetric.PARALLELISM).getCurrent())
+                .reduce(0, Integer::max);
+    }
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java
new file mode 100644
index 00000000..585b5724
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.utils;
+
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Tests for {@link ResourceCheckUtils}. */
+class ResourceCheckUtilsTest {
+
+    @Test
+    void testEstimateNumTaskSlotsAfterRescale() {
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+        int sourceParallelism = 2;
+        int sinkParallelism = 3;
+        var metrics =
+                Map.of(
+                        source,
+                        Map.of(
+                                ScalingMetric.PARALLELISM,
+                                EvaluatedScalingMetric.of(sourceParallelism)),
+                        sink,
+                        Map.of(
+                                ScalingMetric.PARALLELISM,
+                                EvaluatedScalingMetric.of(sinkParallelism)));
+
+        Map<JobVertexID, ScalingSummary> scalingSummaries =
+                Map.of(source, new ScalingSummary(sourceParallelism, 7, 
Map.of()));
+
+        // With slot sharing, the max parallelism determines the number of 
task slots required
+        int numTaskSlotsUsed = Math.max(sourceParallelism, sinkParallelism);
+        assertThat(
+                        ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                                metrics, scalingSummaries, numTaskSlotsUsed))
+                .isEqualTo(7);
+
+        // Slot sharing disabled, the number of task slots equals the sum of 
all parallelisms
+        numTaskSlotsUsed = sourceParallelism + sinkParallelism;
+        assertThat(
+                        ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                                metrics, scalingSummaries, numTaskSlotsUsed))
+                .isEqualTo(10);
+
+        // Slot sharing partially disabled, for lack of a better metric, 
assume slot sharing is
+        // disabled
+        numTaskSlotsUsed = numTaskSlotsUsed - 1;
+        assertThat(
+                        ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(
+                                metrics, scalingSummaries, numTaskSlotsUsed))
+                .isEqualTo(10);
+    }
+}

Reply via email to