This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 640b4368 [hotfix][autoscaler] Cover ScalingRealizer override 
application with tests (#1075)
640b4368 is described below

commit 640b4368870a60cad7346c9b7a4f14c4174348d8
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Wed Apr 29 17:13:43 2026 +0300

    [hotfix][autoscaler] Cover ScalingRealizer override application with tests 
(#1075)
---
 .../flink/autoscaler/JobAutoScalerImplTest.java    |  58 ++++++++++-
 .../autoscaler/KubernetesScalingRealizerTest.java  | 116 +++++++++++++++++++++
 2 files changed, 173 insertions(+), 1 deletion(-)

diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 7ba9e88c..fba97f99 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.autoscaler;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.exceptions.NotReadyException;
@@ -293,6 +294,12 @@ public class JobAutoScalerImplTest {
         autoscaler.scale(context);
         assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
 
+        // Test job not running: overrides must still be re-projected onto the 
spec so that an
+        // in-flight upgrade does not regress the autoscaler's last decision 
(self-heal).
+        var notRunningContext = 
context.toBuilder().jobStatus(JobStatus.INITIALIZING).build();
+        autoscaler.scale(notRunningContext);
+        assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
+
         // Make sure cleanup removes everything
         assertTrue(stateStore.hasDataFor(context));
         autoscaler.cleanup(context);
@@ -351,6 +358,55 @@ public class JobAutoScalerImplTest {
         assertParallelismOverrides(Map.of(v1.toString(), "1", v2.toString(), 
"4"));
     }
 
+    @Test
+    void testOverridesAreReappliedWhenJobNotRunning() throws Exception {
+        // Regression guard for the self-heal contract: while the job is in an 
in-flight upgrade
+        // (JobStatus != RUNNING), the autoscaler must still re-project its 
persisted decisions
+        // onto the spec on every reconcile cycle. Skipping the realizer in 
this state (as an
+        // earlier "if (!waiting)" guard did) lets external resets of 
spec.flinkConfiguration go
+        // unhealed and triggers a spurious second upgrade once the reconciler 
diffs the spec
+        // against lastReconciledSpec.
+        
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+
+        var v1 = new JobVertexID().toString();
+        stateStore.storeParallelismOverrides(context, Map.of(v1, "3"));
+        ConfigChanges configChanges = new ConfigChanges();
+        configChanges.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
0.42f);
+        stateStore.storeConfigChanges(context, configChanges);
+        stateStore.flush(context);
+
+        var notRunningContext = 
context.toBuilder().jobStatus(JobStatus.INITIALIZING).build();
+        var autoscaler =
+                new JobAutoScalerImpl<>(
+                        new TestingMetricsCollector<>(new JobTopology()),
+                        null,
+                        null,
+                        eventCollector,
+                        scalingRealizer,
+                        stateStore);
+
+        autoscaler.scale(notRunningContext);
+
+        boolean sawParallelism = false;
+        boolean sawConfig = false;
+        for (var event : scalingRealizer.events) {
+            if (event.getParallelismOverrides() != null) {
+                sawParallelism = true;
+                assertEquals(Map.of(v1, "3"), event.getParallelismOverrides());
+            }
+            if (event.getConfigChanges() != null) {
+                sawConfig = true;
+                assertThat(event.getConfigChanges().getOverrides())
+                        .containsExactly(
+                                
entry(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.42"));
+            }
+        }
+        assertTrue(
+                sawParallelism,
+                "Parallelism overrides must be re-applied while job is not 
running");
+        assertTrue(sawConfig, "Config overrides must be re-applied while job 
is not running");
+    }
+
     @Test
     void testApplyConfigOverrides() throws Exception {
         
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
@@ -384,7 +440,7 @@ public class JobAutoScalerImplTest {
 
     @Test
     void testAutoscalerDisabled() throws Exception {
-        context.getConfiguration().setBoolean(AUTOSCALER_ENABLED, false);
+        context.getConfiguration().set(AUTOSCALER_ENABLED, false);
         context.getConfiguration().set(VERTEX_SCALING_HISTORY_AGE, 
Duration.ofMillis(200));
 
         var scalingHistory = new TreeMap<Instant, ScalingSummary>();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index ba647034..ef8a7c86 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -68,6 +68,122 @@ public class KubernetesScalingRealizerTest {
         assertOverridesDoNotChange("b:2,a:1", newOverrides);
     }
 
+    @Test
+    public void testRealizeParallelismOverridesIsNoOpWhenSpecAlreadyMatches() {
+        // Required to keep the test config context on legacy Flink YAML 
converters.
+        GlobalConfiguration.setStandardYaml(false);
+
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+        FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+        // Pre-populate the spec with the exact override string the autoscaler 
would produce
+        // and persist it as the last reconciled spec so observeConfig is in 
sync.
+        resource.getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");
+        resource.getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(resource.getSpec(), 
resource);
+
+        new KubernetesScalingRealizer()
+                .realizeParallelismOverrides(ctx, Map.of("a", "1", "b", "2"));
+
+        // Steady-state: realizer must short-circuit and leave the spec value 
byte-identical
+        // (no NOOP rewrite that could re-order keys or churn 
lastReconciledSpec).
+        assertThat(
+                        resource.getSpec()
+                                .getFlinkConfiguration()
+                                .asFlatMap()
+                                
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+                .isEqualTo("a:1,b:2");
+    }
+
+    @Test
+    public void testRealizeParallelismOverridesReappliesWhenSpecDrifted() {
+        // Required to keep the test config context on legacy Flink YAML 
converters.
+        GlobalConfiguration.setStandardYaml(false);
+
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+        FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+        // Spec was clobbered with a stale value mid-upgrade (e.g. last-state 
recovery).
+        resource.getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1");
+        resource.getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(resource.getSpec(), 
resource);
+
+        new KubernetesScalingRealizer()
+                .realizeParallelismOverrides(ctx, Map.of("a", "1", "b", "2"));
+
+        // Drift was self-healed.
+        assertThat(
+                        resource.getSpec()
+                                .getFlinkConfiguration()
+                                .asFlatMap()
+                                
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+                .isIn("a:1,b:2", "b:2,a:1");
+    }
+
+    @Test
+    public void testRealizeConfigOverridesIsNoOpWhenSpecAlreadyMatches() {
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+        FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+        // Pre-populate the spec so the override is already present and the 
removal target
+        // is already absent.
+        resource.getSpec()
+                .getFlinkConfiguration()
+                .put(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.42");
+        String initialTmMemory = 
resource.getSpec().getTaskManager().getResource().getMemory();
+
+        ConfigChanges overrides = new ConfigChanges();
+        overrides.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
0.42f);
+        overrides.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
+        new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
+
+        assertThat(
+                        resource.getSpec()
+                                .getFlinkConfiguration()
+                                .asFlatMap()
+                                
.get(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key()))
+                .isEqualTo("0.42");
+        assertThat(resource.getSpec().getFlinkConfiguration().asFlatMap())
+                .doesNotContainKey(TaskManagerOptions.TASK_HEAP_MEMORY.key());
+        // TM memory must not be churned in steady-state (the value can only 
change if total
+        // memory tuning produced a new target; here the spec already reflects 
intent).
+        
assertThat(resource.getSpec().getTaskManager().getResource().getMemory())
+                .isEqualTo(initialTmMemory);
+    }
+
+    @Test
+    public void testRealizeConfigOverridesReappliesWhenSpecDrifted() {
+        KubernetesJobAutoScalerContext ctx =
+                TestingKubernetesAutoscalerUtils.createContext("test", null);
+        FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+        // Spec was clobbered with a stale value.
+        resource.getSpec()
+                .getFlinkConfiguration()
+                .put(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.10");
+
+        ConfigChanges overrides = new ConfigChanges();
+        overrides.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 
0.42f);
+        new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
+
+        // Drift was self-healed.
+        assertThat(
+                        resource.getSpec()
+                                .getFlinkConfiguration()
+                                .asFlatMap()
+                                
.get(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key()))
+                .isEqualTo("0.42");
+    }
+
     @Test
     public void testApplyMemoryOverrides() {
         KubernetesJobAutoScalerContext ctx =

Reply via email to