This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 158cbe29 [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map (#720) 158cbe29 is described below commit 158cbe29169cbfb7fa7ad676fb0273fd7ef6d25e Author: Maximilian Michels <m...@apache.org> AuthorDate: Fri Dec 1 11:56:42 2023 +0100 [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism override map (#720) --- .../autoscaler/KubernetesScalingRealizer.java | 4 +++ .../autoscaler/KubernetesScalingRealizerTest.java} | 42 +++++++++++++--------- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java index ca3f4ef2..6bb7a949 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.PipelineOptions; import io.javaoperatorsdk.operator.processing.event.ResourceID; import java.util.Map; +import java.util.TreeMap; /** The Kubernetes implementation for applying parallelism overrides. */ public class KubernetesScalingRealizer @@ -32,6 +33,9 @@ public class KubernetesScalingRealizer @Override public void realize( KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) { + // Make sure the keys are sorted via TreeMap to prevent changing the spec when none of the + // entries changed but the key order is different! + parallelismOverrides = new TreeMap<>(parallelismOverrides); context.getResource() .getSpec() .getFlinkConfiguration() diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java similarity index 50% copy from flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java copy to flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java index ca3f4ef2..dda7ba0a 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java @@ -17,26 +17,36 @@ package org.apache.flink.kubernetes.operator.autoscaler; -import org.apache.flink.autoscaler.realizer.ScalingRealizer; -import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.PipelineOptions; -import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.junit.jupiter.api.Test; +import java.util.LinkedHashMap; import java.util.Map; -/** The Kubernetes implementation for applying parallelism overrides. */ -public class KubernetesScalingRealizer - implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> { - - @Override - public void realize( - KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) { - context.getResource() - .getSpec() - .getFlinkConfiguration() - .put( - PipelineOptions.PARALLELISM_OVERRIDES.key(), - ConfigurationUtils.convertValue(parallelismOverrides, String.class)); +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for KubernetesScalingRealizer. */ +public class KubernetesScalingRealizerTest { + + @Test + public void testAutoscalerOverridesVertexIdsAreSorted() { + + KubernetesJobAutoScalerContext ctx = + TestingKubernetesAutoscalerUtils.createContext("test", null); + + // Create map which returns keys unsorted + Map<String, String> overrides = new LinkedHashMap<>(); + overrides.put("b", "2"); + overrides.put("a", "1"); + + new KubernetesScalingRealizer().realize(ctx, overrides); + + assertThat( + ctx.getResource() + .getSpec() + .getFlinkConfiguration() + .get(PipelineOptions.PARALLELISM_OVERRIDES.key())) + .isEqualTo("a:1,b:2"); } }