This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 158cbe29 [FLINK-33710] Prevent NOOP spec updates from the autoscaler 
parallelism override map (#720)
158cbe29 is described below

commit 158cbe29169cbfb7fa7ad676fb0273fd7ef6d25e
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Fri Dec 1 11:56:42 2023 +0100

    [FLINK-33710] Prevent NOOP spec updates from the autoscaler parallelism 
override map (#720)
---
 .../autoscaler/KubernetesScalingRealizer.java      |  4 +++
 .../autoscaler/KubernetesScalingRealizerTest.java} | 42 +++++++++++++---------
 2 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index ca3f4ef2..6bb7a949 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.PipelineOptions;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 
 import java.util.Map;
+import java.util.TreeMap;
 
 /** The Kubernetes implementation for applying parallelism overrides. */
 public class KubernetesScalingRealizer
@@ -32,6 +33,9 @@ public class KubernetesScalingRealizer
     @Override
     public void realize(
             KubernetesJobAutoScalerContext context, Map<String, String> 
parallelismOverrides) {
+        // Make sure the keys are sorted via TreeMap to prevent changing the 
spec when none of the
+        // entries changed but the key order is different!
+        parallelismOverrides = new TreeMap<>(parallelismOverrides);
         context.getResource()
                 .getSpec()
                 .getFlinkConfiguration()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
similarity index 50%
copy from 
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
copy to 
flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index ca3f4ef2..dda7ba0a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -17,26 +17,36 @@
 
 package org.apache.flink.kubernetes.operator.autoscaler;
 
-import org.apache.flink.autoscaler.realizer.ScalingRealizer;
-import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.PipelineOptions;
 
-import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.junit.jupiter.api.Test;
 
+import java.util.LinkedHashMap;
 import java.util.Map;
 
-/** The Kubernetes implementation for applying parallelism overrides. */
-public class KubernetesScalingRealizer
-        implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> 
{
-
-    @Override
-    public void realize(
-            KubernetesJobAutoScalerContext context, Map<String, String> 
parallelismOverrides) {
-        context.getResource()
-                .getSpec()
-                .getFlinkConfiguration()
-                .put(
-                        PipelineOptions.PARALLELISM_OVERRIDES.key(),
-                        ConfigurationUtils.convertValue(parallelismOverrides, 
String.class));
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for KubernetesScalingRealizer. */
+public class KubernetesScalingRealizerTest {
+
+    @Test
+    public void testAutoscalerOverridesVertexIdsAreSorted() {
+
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+
+        // Create map which returns keys unsorted
+        Map<String, String> overrides = new LinkedHashMap<>();
+        overrides.put("b", "2");
+        overrides.put("a", "1");
+
+        new KubernetesScalingRealizer().realize(ctx, overrides);
+
+        assertThat(
+                        ctx.getResource()
+                                .getSpec()
+                                .getFlinkConfiguration()
+                                
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+                .isEqualTo("a:1,b:2");
     }
 }

Reply via email to