This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 90edc9c6 [FLINK-38859] Do not apply minimum job vertex parallelism for
whole job in native mode
90edc9c6 is described below
commit 90edc9c6586fc451af5ef6ecb0c3c6b5d029f588
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jan 5 14:05:08 2026 +0100
[FLINK-38859] Do not apply minimum job vertex parallelism for whole job in
native mode
---
.../operator/config/FlinkConfigBuilder.java | 8 ++++--
.../operator/config/FlinkConfigBuilderTest.java | 32 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 3 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index e50e1268..b99863c8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -363,9 +363,11 @@ public class FlinkConfigBuilder {
* effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
}
- Optional<Integer> maxOverrideParallelism =
getMaxParallelismFromOverrideConfig();
- if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get()
> 0) {
- return maxOverrideParallelism.get();
+ if (KubernetesDeploymentMode.STANDALONE.equals(spec.getMode())) {
+ Optional<Integer> maxOverrideParallelism =
getMaxParallelismFromOverrideConfig();
+ if (maxOverrideParallelism.isPresent() &&
maxOverrideParallelism.get() > 0) {
+ return maxOverrideParallelism.get();
+ }
}
return spec.getJob().getParallelism();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 674ef4d5..0d2c2c93 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -874,6 +874,38 @@ public class FlinkConfigBuilderTest {
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
}
+ @Test
+ public void testParallelismOverridesOnlyAppliedForStandaloneMode()
+ throws URISyntaxException, IOException {
+ FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
+ dep.getSpec().setTaskManager(new TaskManagerSpec());
+ dep.getSpec().getJob().setParallelism(5);
+ dep.getSpec()
+ .getFlinkConfiguration()
+ .put(PipelineOptions.PARALLELISM_OVERRIDES.key(),
"vertex1:10,vertex2:20");
+
+ // Test STANDALONE mode - parallelism overrides should be used
+ dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+ Configuration configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+ assertEquals(20, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+
+ // Test NATIVE mode - parallelism overrides should NOT be used, fall
back to job
+ // parallelism
+ dep.getSpec().setMode(KubernetesDeploymentMode.NATIVE);
+ configuration =
+ new FlinkConfigBuilder(dep, new Configuration())
+ .applyFlinkConfiguration()
+ .applyTaskManagerSpec()
+ .applyJobOrSessionSpec()
+ .build();
+ assertEquals(5, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+ }
+
@Test
public void testBuildFrom() throws Exception {
final Configuration configuration =