This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ca1d8472 [FLINK-33710] Prevent triggering cluster upgrades for 
permutations of the same overrides (#721)
ca1d8472 is described below

commit ca1d8472d1a1e817268950dae079592581fa5b8f
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Wed Dec 6 16:53:01 2023 +0100

    [FLINK-33710] Prevent triggering cluster upgrades for permutations of the 
same overrides (#721)
    
    Previous fix in #720 made the parallelism override string deterministic, 
but it
    will likely result in existing deployments to trigger a one-off unneeded 
spec
    update.
    
    To prevent this and to update only once an actual scaling occurs, we
    need to compare the existing overrides with the new ones and check if they 
are
    identical. In this case, we restore the current override string with its own
    permutation.
---
 .../autoscaler/KubernetesJobAutoScalerContext.java | 18 ++++----
 .../autoscaler/KubernetesScalingRealizer.java      | 31 ++++++++++---
 .../operator/controller/FlinkResourceContext.java  |  5 ++-
 .../AbstractFlinkResourceReconciler.java           |  1 +
 .../autoscaler/KubernetesScalingRealizerTest.java  | 51 +++++++++++++++++++---
 .../TestingKubernetesAutoscalerUtils.java          | 15 ++++++-
 6 files changed, 95 insertions(+), 26 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
index b50f8a7a..187a7118 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesJobAutoScalerContext.java
@@ -23,20 +23,20 @@ import org.apache.flink.autoscaler.JobAutoScalerContext;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.util.function.SupplierWithException;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import lombok.Getter;
 
 import javax.annotation.Nullable;
 
 /** An implementation of JobAutoscalerContext for Kubernetes. */
 public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext<ResourceID> {
 
-    private final AbstractFlinkResource<?, ?> resource;
-
-    private final KubernetesClient kubernetesClient;
+    @Getter private final FlinkResourceContext<?> resourceContext;
 
     public KubernetesJobAutoScalerContext(
             @Nullable JobID jobID,
@@ -44,24 +44,22 @@ public class KubernetesJobAutoScalerContext extends 
JobAutoScalerContext<Resourc
             Configuration configuration,
             MetricGroup metricGroup,
             SupplierWithException<RestClusterClient<String>, Exception> 
restClientSupplier,
-            AbstractFlinkResource<?, ?> resource,
-            KubernetesClient kubernetesClient) {
+            FlinkResourceContext<?> resourceContext) {
         super(
-                ResourceID.fromResource(resource),
+                ResourceID.fromResource(resourceContext.getResource()),
                 jobID,
                 jobStatus,
                 configuration,
                 metricGroup,
                 restClientSupplier);
-        this.resource = resource;
-        this.kubernetesClient = kubernetesClient;
+        this.resourceContext = resourceContext;
     }
 
     public AbstractFlinkResource<?, ?> getResource() {
-        return resource;
+        return resourceContext.getResource();
     }
 
     public KubernetesClient getKubernetesClient() {
-        return kubernetesClient;
+        return resourceContext.getKubernetesClient();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
index 6bb7a949..d20f783d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizer.java
@@ -23,8 +23,9 @@ import org.apache.flink.configuration.PipelineOptions;
 
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
-import java.util.TreeMap;
 
 /** The Kubernetes implementation for applying parallelism overrides. */
 public class KubernetesScalingRealizer
@@ -33,14 +34,34 @@ public class KubernetesScalingRealizer
     @Override
     public void realize(
             KubernetesJobAutoScalerContext context, Map<String, String> 
parallelismOverrides) {
-        // Make sure the keys are sorted via TreeMap to prevent changing the 
spec when none of the
-        // entries changed but the key order is different!
-        parallelismOverrides = new TreeMap<>(parallelismOverrides);
+
         context.getResource()
                 .getSpec()
                 .getFlinkConfiguration()
                 .put(
                         PipelineOptions.PARALLELISM_OVERRIDES.key(),
-                        ConfigurationUtils.convertValue(parallelismOverrides, 
String.class));
+                        getOverrideString(context, parallelismOverrides));
+    }
+
+    @Nullable
+    private static String getOverrideString(
+            KubernetesJobAutoScalerContext context, Map<String, String> 
newOverrides) {
+        if 
(context.getResource().getStatus().getReconciliationStatus().isBeforeFirstDeployment())
 {
+            return ConfigurationUtils.convertValue(newOverrides, String.class);
+        }
+
+        var conf = context.getResourceContext().getObserveConfig();
+        var currentOverrides =
+                
conf.getOptional(PipelineOptions.PARALLELISM_OVERRIDES).orElse(Map.of());
+
+        // Check that the overrides actually changed and not just the String 
representation.
+        // This way we prevent reconciling a NOOP config change which would 
unnecessarily redeploy
+        // the pipeline.
+        if (currentOverrides.equals(newOverrides)) {
+            // If overrides are identical, use the previous string as-is.
+            return conf.getValue(PipelineOptions.PARALLELISM_OVERRIDES);
+        } else {
+            return ConfigurationUtils.convertValue(newOverrides, String.class);
+        }
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index e2b70f21..41b029ea 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -81,8 +81,7 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
                 conf,
                 getResourceMetricGroup(),
                 () -> getFlinkService().getClusterClient(conf),
-                resource,
-                getKubernetesClient());
+                this);
     }
 
     @Nullable
@@ -104,6 +103,7 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
      *
      * @return Config currently deployed.
      */
+    @Nullable
     public Configuration getObserveConfig() {
         if (observeConfig != null) {
             return observeConfig;
@@ -118,6 +118,7 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
      * @param spec Spec for which the config should be created.
      * @return Deployment configuration.
      */
+    @Nullable
     public abstract Configuration getDeployConfig(AbstractFlinkSpec spec);
 
     /**
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 5bef0ba7..1113739c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -186,6 +186,7 @@ public abstract class AbstractFlinkResourceReconciler<
                 ctx.getResource().getSpec().getJob() != null
                         && 
ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
         autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, 
autoscalerEnabled);
+
         autoscaler.scale(autoScalerCtx);
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index dda7ba0a..be6f38c3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 
 import org.junit.jupiter.api.Test;
 
@@ -30,23 +31,59 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class KubernetesScalingRealizerTest {
 
     @Test
-    public void testAutoscalerOverridesVertexIdsAreSorted() {
+    public void testApplyOverrides() {
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+
+        new KubernetesScalingRealizer().realize(ctx, Map.of("a", "1", "b", 
"2"));
+
+        assertThat(
+                        ctx.getResource()
+                                .getSpec()
+                                .getFlinkConfiguration()
+                                
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+                .satisfiesAnyOf(
+                        // Currently no enforced order inside the overrides 
string
+                        overrides -> 
assertThat(overrides).isEqualTo("a:1,b:2"),
+                        overrides -> 
assertThat(overrides).isEqualTo("b:2,a:1"));
+    }
+
+    @Test
+    public void 
testAutoscalerOverridesStringDoesNotChangeUnlessOverridesChange() {
+        // Create an overrides map which returns the keys in a deterministic 
order
+        LinkedHashMap<String, String> newOverrides = new LinkedHashMap<>();
+        newOverrides.put("b", "2");
+        newOverrides.put("a", "1");
+
+        assertOverridesDoNotChange("a:1,b:2", newOverrides);
+        assertOverridesDoNotChange("b:2,a:1", newOverrides);
+    }
+
+    private void assertOverridesDoNotChange(
+            String currentOverrides, LinkedHashMap<String, String> 
newOverrides) {
 
         KubernetesJobAutoScalerContext ctx =
                 TestingKubernetesAutoscalerUtils.createContext("test", null);
+        FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
 
-        // Create map which returns keys unsorted
-        Map<String, String> overrides = new LinkedHashMap<>();
-        overrides.put("b", "2");
-        overrides.put("a", "1");
+        // Create resource with existing parallelism overrides
+        resource.getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), 
currentOverrides);
+        resource.getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(resource.getSpec(), 
resource);
+        resource.getSpec()
+                .getFlinkConfiguration()
+                .remove(PipelineOptions.PARALLELISM_OVERRIDES.key());
 
-        new KubernetesScalingRealizer().realize(ctx, overrides);
+        new KubernetesScalingRealizer().realize(ctx, newOverrides);
 
         assertThat(
                         ctx.getResource()
                                 .getSpec()
                                 .getFlinkConfiguration()
                                 
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
-                .isEqualTo("a:1,b:2");
+                .isEqualTo(currentOverrides);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
index 9cb92ada..dbf0c3bc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/TestingKubernetesAutoscalerUtils.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentContext;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -42,7 +44,16 @@ public class TestingKubernetesAutoscalerUtils {
                 new Configuration(),
                 new UnregisteredMetricsGroup(),
                 () -> new RestClusterClient<>(new Configuration(), 
"test-cluster"),
-                cr,
-                kubernetesClient);
+                new FlinkDeploymentContext(
+                        cr,
+                        new TestUtils.TestingContext<>() {
+                            @Override
+                            public KubernetesClient getClient() {
+                                return kubernetesClient;
+                            }
+                        },
+                        null,
+                        new FlinkConfigManager(new Configuration()),
+                        null));
     }
 }

Reply via email to