This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new ecc15a36 [FLINK-33771] Extract method to estimate task slots and add dedicated test (#773) ecc15a36 is described below commit ecc15a365714b50810c48270b9a192a3ec26a323 Author: Maximilian Michels <m...@apache.org> AuthorDate: Wed Feb 7 11:38:57 2024 +0100 [FLINK-33771] Extract method to estimate task slots and add dedicated test (#773) --- .../apache/flink/autoscaler/ScalingExecutor.java | 38 ++--------- .../flink/autoscaler/utils/ResourceCheckUtils.java | 78 ++++++++++++++++++++++ .../autoscaler/utils/ResourceCheckUtilsTest.java | 76 +++++++++++++++++++++ 3 files changed, 160 insertions(+), 32 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java index d85b311e..094d4680 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java @@ -27,6 +27,7 @@ import org.apache.flink.autoscaler.resources.NoopResourceCheck; import org.apache.flink.autoscaler.resources.ResourceCheck; import org.apache.flink.autoscaler.state.AutoScalerStateStore; import org.apache.flink.autoscaler.utils.CalendarUtils; +import org.apache.flink.autoscaler.utils.ResourceCheckUtils; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; @@ -278,38 +279,11 @@ public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> { return true; } - var vertexMetrics = evaluatedMetrics.getVertexMetrics(); - - int oldParallelismSum = - vertexMetrics.values().stream() - .map(map -> (int) map.get(ScalingMetric.PARALLELISM).getCurrent()) - .reduce(0, Integer::sum); - - Map<JobVertexID, Integer> newParallelisms = new HashMap<>(); - for (Map.Entry<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> entry : - vertexMetrics.entrySet()) { - JobVertexID jobVertexID = entry.getKey(); - ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID); - if (scalingSummary != null) { - newParallelisms.put(jobVertexID, scalingSummary.getNewParallelism()); - } else { - newParallelisms.put( - jobVertexID, - (int) entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent()); - } - } - - double numTaskSlotsUsed = globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent(); - - final int numTaskSlotsAfterRescale; - if (oldParallelismSum >= numTaskSlotsUsed) { - // Slot sharing is (partially) deactivated, - // assuming no slot sharing in absence of additional data. - numTaskSlotsAfterRescale = newParallelisms.values().stream().reduce(0, Integer::sum); - } else { - // Slot sharing is activated - numTaskSlotsAfterRescale = newParallelisms.values().stream().reduce(0, Integer::max); - } + int numTaskSlotsUsed = + (int) globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent(); + final int numTaskSlotsAfterRescale = + ResourceCheckUtils.estimateNumTaskSlotsAfterRescale( + evaluatedMetrics.getVertexMetrics(), scalingSummaries, numTaskSlotsUsed); int taskSlotsPerTm = ctx.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java new file mode 100644 index 00000000..18bf58c0 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/ResourceCheckUtils.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.utils; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import java.util.HashMap; +import java.util.Map; + +/** Utils methods for resource checks. */ +public class ResourceCheckUtils { + + public static int estimateNumTaskSlotsAfterRescale( + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> vertexMetrics, + Map<JobVertexID, ScalingSummary> scalingSummaries, + int numTaskSlotsUsed) { + + Map<JobVertexID, Integer> newParallelisms = + computeNewParallelisms(scalingSummaries, vertexMetrics); + + if (currentMaxParallelism(vertexMetrics) == numTaskSlotsUsed) { + // Slot sharing is activated + return newParallelisms.values().stream().reduce(0, Integer::max); + } else { + // Slot sharing is (partially) deactivated, + // assuming no slot sharing in absence of additional metrics. + return newParallelisms.values().stream().reduce(0, Integer::sum); + } + } + + private static Map<JobVertexID, Integer> computeNewParallelisms( + Map<JobVertexID, ScalingSummary> scalingSummaries, + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> vertexMetrics) { + + Map<JobVertexID, Integer> newParallelisms = new HashMap<>(); + + for (Map.Entry<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> entry : + vertexMetrics.entrySet()) { + JobVertexID jobVertexID = entry.getKey(); + ScalingSummary scalingSummary = scalingSummaries.get(jobVertexID); + if (scalingSummary != null) { + newParallelisms.put(jobVertexID, scalingSummary.getNewParallelism()); + } else { + newParallelisms.put( + jobVertexID, + (int) entry.getValue().get(ScalingMetric.PARALLELISM).getCurrent()); + } + } + + return newParallelisms; + } + + private static int currentMaxParallelism( + Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> vertexMetrics) { + + return vertexMetrics.values().stream() + .map(map -> (int) map.get(ScalingMetric.PARALLELISM).getCurrent()) + .reduce(0, Integer::max); + } +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java new file mode 100644 index 00000000..585b5724 --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/utils/ResourceCheckUtilsTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.utils; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.autoscaler.metrics.ScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Tests for {@link ResourceCheckUtils}. */ +class ResourceCheckUtilsTest { + + @Test + void testEstimateNumTaskSlotsAfterRescale() { + var source = new JobVertexID(); + var sink = new JobVertexID(); + int sourceParallelism = 2; + int sinkParallelism = 3; + var metrics = + Map.of( + source, + Map.of( + ScalingMetric.PARALLELISM, + EvaluatedScalingMetric.of(sourceParallelism)), + sink, + Map.of( + ScalingMetric.PARALLELISM, + EvaluatedScalingMetric.of(sinkParallelism))); + + Map<JobVertexID, ScalingSummary> scalingSummaries = + Map.of(source, new ScalingSummary(sourceParallelism, 7, Map.of())); + + // With slot sharing, the max parallelism determines the number of task slots required + int numTaskSlotsUsed = Math.max(sourceParallelism, sinkParallelism); + assertThat( + ResourceCheckUtils.estimateNumTaskSlotsAfterRescale( + metrics, scalingSummaries, numTaskSlotsUsed)) + .isEqualTo(7); + + // Slot sharing disabled, the number of task slots equals the sum of all parallelisms + numTaskSlotsUsed = sourceParallelism + sinkParallelism; + assertThat( + ResourceCheckUtils.estimateNumTaskSlotsAfterRescale( + metrics, scalingSummaries, numTaskSlotsUsed)) + .isEqualTo(10); + + // Slot sharing partially disabled, for lack of a better metric, assume slot sharing is + // disabled + numTaskSlotsUsed = numTaskSlotsUsed - 1; + assertThat( + ResourceCheckUtils.estimateNumTaskSlotsAfterRescale( + metrics, scalingSummaries, numTaskSlotsUsed)) + .isEqualTo(10); + } +}