This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 640b4368 [hotfix][autoscaler] Cover ScalingRealizer override
application with tests (#1075)
640b4368 is described below
commit 640b4368870a60cad7346c9b7a4f14c4174348d8
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Wed Apr 29 17:13:43 2026 +0300
[hotfix][autoscaler] Cover ScalingRealizer override application with tests
(#1075)
---
.../flink/autoscaler/JobAutoScalerImplTest.java | 58 ++++++++++-
.../autoscaler/KubernetesScalingRealizerTest.java | 116 +++++++++++++++++++++
2 files changed, 173 insertions(+), 1 deletion(-)
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 7ba9e88c..fba97f99 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.autoscaler;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
@@ -293,6 +294,12 @@ public class JobAutoScalerImplTest {
autoscaler.scale(context);
assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
+ // Test job not running: overrides must still be re-projected onto the
spec so that an
+ // in-flight upgrade does not regress the autoscaler's last decision
(self-heal).
+ var notRunningContext =
context.toBuilder().jobStatus(JobStatus.INITIALIZING).build();
+ autoscaler.scale(notRunningContext);
+ assertParallelismOverrides(Map.of(v1, "1", v2, "2"));
+
// Make sure cleanup removes everything
assertTrue(stateStore.hasDataFor(context));
autoscaler.cleanup(context);
@@ -351,6 +358,55 @@ public class JobAutoScalerImplTest {
assertParallelismOverrides(Map.of(v1.toString(), "1", v2.toString(),
"4"));
}
+ @Test
+ void testOverridesAreReappliedWhenJobNotRunning() throws Exception {
+ // Regression guard for the self-heal contract: while the job is in an
in-flight upgrade
+ // (JobStatus != RUNNING), the autoscaler must still re-project its
persisted decisions
+ // onto the spec on every reconcile cycle. Skipping the realizer in
this state (as an
+ // earlier "if (!waiting)" guard did) lets external resets of
spec.flinkConfiguration go
+ // unhealed and triggers a spurious second upgrade once the reconciler
diffs the spec
+ // against lastReconciledSpec.
+
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
+
+ var v1 = new JobVertexID().toString();
+ stateStore.storeParallelismOverrides(context, Map.of(v1, "3"));
+ ConfigChanges configChanges = new ConfigChanges();
+ configChanges.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION,
0.42f);
+ stateStore.storeConfigChanges(context, configChanges);
+ stateStore.flush(context);
+
+ var notRunningContext =
context.toBuilder().jobStatus(JobStatus.INITIALIZING).build();
+ var autoscaler =
+ new JobAutoScalerImpl<>(
+ new TestingMetricsCollector<>(new JobTopology()),
+ null,
+ null,
+ eventCollector,
+ scalingRealizer,
+ stateStore);
+
+ autoscaler.scale(notRunningContext);
+
+ boolean sawParallelism = false;
+ boolean sawConfig = false;
+ for (var event : scalingRealizer.events) {
+ if (event.getParallelismOverrides() != null) {
+ sawParallelism = true;
+ assertEquals(Map.of(v1, "3"), event.getParallelismOverrides());
+ }
+ if (event.getConfigChanges() != null) {
+ sawConfig = true;
+ assertThat(event.getConfigChanges().getOverrides())
+ .containsExactly(
+
entry(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.42"));
+ }
+ }
+ assertTrue(
+ sawParallelism,
+ "Parallelism overrides must be re-applied while job is not
running");
+ assertTrue(sawConfig, "Config overrides must be re-applied while job
is not running");
+ }
+
@Test
void testApplyConfigOverrides() throws Exception {
context.getConfiguration().set(AutoScalerOptions.MEMORY_TUNING_ENABLED, true);
@@ -384,7 +440,7 @@ public class JobAutoScalerImplTest {
@Test
void testAutoscalerDisabled() throws Exception {
- context.getConfiguration().setBoolean(AUTOSCALER_ENABLED, false);
+ context.getConfiguration().set(AUTOSCALER_ENABLED, false);
context.getConfiguration().set(VERTEX_SCALING_HISTORY_AGE,
Duration.ofMillis(200));
var scalingHistory = new TreeMap<Instant, ScalingSummary>();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
index ba647034..ef8a7c86 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/KubernetesScalingRealizerTest.java
@@ -68,6 +68,122 @@ public class KubernetesScalingRealizerTest {
assertOverridesDoNotChange("b:2,a:1", newOverrides);
}
+ @Test
+ public void testRealizeParallelismOverridesIsNoOpWhenSpecAlreadyMatches() {
+ // Required to keep the test config context on legacy Flink YAML
converters.
+ GlobalConfiguration.setStandardYaml(false);
+
+ KubernetesJobAutoScalerContext ctx =
+ TestingKubernetesAutoscalerUtils.createContext("test", null);
+ FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+ // Pre-populate the spec with the exact override string the autoscaler
would produce
+ // and persist it as the last reconciled spec so observeConfig is in
sync.
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1,b:2");
+ resource.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(resource.getSpec(),
resource);
+
+ new KubernetesScalingRealizer()
+ .realizeParallelismOverrides(ctx, Map.of("a", "1", "b", "2"));
+
+ // Steady-state: realizer must short-circuit and leave the spec value
byte-identical
+ // (no NOOP rewrite that could re-order keys or churn
lastReconciledSpec).
+ assertThat(
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .asFlatMap()
+
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+ .isEqualTo("a:1,b:2");
+ }
+
+ @Test
+ public void testRealizeParallelismOverridesReappliesWhenSpecDrifted() {
+ // Required to keep the test config context on legacy Flink YAML
converters.
+ GlobalConfiguration.setStandardYaml(false);
+
+ KubernetesJobAutoScalerContext ctx =
+ TestingKubernetesAutoscalerUtils.createContext("test", null);
+ FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+ // Spec was clobbered with a stale value mid-upgrade (e.g. last-state
recovery).
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), "a:1");
+ resource.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(resource.getSpec(),
resource);
+
+ new KubernetesScalingRealizer()
+ .realizeParallelismOverrides(ctx, Map.of("a", "1", "b", "2"));
+
+ // Drift was self-healed.
+ assertThat(
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .asFlatMap()
+
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()))
+ .isIn("a:1,b:2", "b:2,a:1");
+ }
+
+ @Test
+ public void testRealizeConfigOverridesIsNoOpWhenSpecAlreadyMatches() {
+ KubernetesJobAutoScalerContext ctx =
+ TestingKubernetesAutoscalerUtils.createContext("test", null);
+ FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+ // Pre-populate the spec so the override is already present and the
removal target
+ // is already absent.
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .put(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.42");
+ String initialTmMemory =
resource.getSpec().getTaskManager().getResource().getMemory();
+
+ ConfigChanges overrides = new ConfigChanges();
+ overrides.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION,
0.42f);
+ overrides.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
+ new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
+
+ assertThat(
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .asFlatMap()
+
.get(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key()))
+ .isEqualTo("0.42");
+ assertThat(resource.getSpec().getFlinkConfiguration().asFlatMap())
+ .doesNotContainKey(TaskManagerOptions.TASK_HEAP_MEMORY.key());
+ // TM memory must not be churned in steady-state (the value can only
change if total
+ // memory tuning produced a new target; here the spec already reflects
intent).
+
assertThat(resource.getSpec().getTaskManager().getResource().getMemory())
+ .isEqualTo(initialTmMemory);
+ }
+
+ @Test
+ public void testRealizeConfigOverridesReappliesWhenSpecDrifted() {
+ KubernetesJobAutoScalerContext ctx =
+ TestingKubernetesAutoscalerUtils.createContext("test", null);
+ FlinkDeployment resource = (FlinkDeployment) ctx.getResource();
+
+ // Spec was clobbered with a stale value.
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .put(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), "0.10");
+
+ ConfigChanges overrides = new ConfigChanges();
+ overrides.addOverride(TaskManagerOptions.MANAGED_MEMORY_FRACTION,
0.42f);
+ new KubernetesScalingRealizer().realizeConfigOverrides(ctx, overrides);
+
+ // Drift was self-healed.
+ assertThat(
+ resource.getSpec()
+ .getFlinkConfiguration()
+ .asFlatMap()
+
.get(TaskManagerOptions.MANAGED_MEMORY_FRACTION.key()))
+ .isEqualTo("0.42");
+ }
+
@Test
public void testApplyMemoryOverrides() {
KubernetesJobAutoScalerContext ctx =