This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 90edc9c6 [FLINK-38859] Do not apply minimum job vertex parallelism for 
whole job in native mode
90edc9c6 is described below

commit 90edc9c6586fc451af5ef6ecb0c3c6b5d029f588
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jan 5 14:05:08 2026 +0100

    [FLINK-38859] Do not apply minimum job vertex parallelism for whole job in 
native mode
---
 .../operator/config/FlinkConfigBuilder.java        |  8 ++++--
 .../operator/config/FlinkConfigBuilderTest.java    | 32 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
index e50e1268..b99863c8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java
@@ -363,9 +363,11 @@ public class FlinkConfigBuilder {
                     * effectiveConfig.get(TaskManagerOptions.NUM_TASK_SLOTS);
         }
 
-        Optional<Integer> maxOverrideParallelism = 
getMaxParallelismFromOverrideConfig();
-        if (maxOverrideParallelism.isPresent() && maxOverrideParallelism.get() 
> 0) {
-            return maxOverrideParallelism.get();
+        if (KubernetesDeploymentMode.STANDALONE.equals(spec.getMode())) {
+            Optional<Integer> maxOverrideParallelism = 
getMaxParallelismFromOverrideConfig();
+            if (maxOverrideParallelism.isPresent() && 
maxOverrideParallelism.get() > 0) {
+                return maxOverrideParallelism.get();
+            }
         }
 
         return spec.getJob().getParallelism();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
index 674ef4d5..0d2c2c93 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.java
@@ -874,6 +874,38 @@ public class FlinkConfigBuilderTest {
                         
StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
     }
 
+    @Test
+    public void testParallelismOverridesOnlyAppliedForStandaloneMode()
+            throws URISyntaxException, IOException {
+        FlinkDeployment dep = ReconciliationUtils.clone(flinkDeployment);
+        dep.getSpec().setTaskManager(new TaskManagerSpec());
+        dep.getSpec().getJob().setParallelism(5);
+        dep.getSpec()
+                .getFlinkConfiguration()
+                .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), 
"vertex1:10,vertex2:20");
+
+        // Test STANDALONE mode - parallelism overrides should be used
+        dep.getSpec().setMode(KubernetesDeploymentMode.STANDALONE);
+        Configuration configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyTaskManagerSpec()
+                        .applyJobOrSessionSpec()
+                        .build();
+        assertEquals(20, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+
+        // Test NATIVE mode - parallelism overrides should NOT be used, fall 
back to job
+        // parallelism
+        dep.getSpec().setMode(KubernetesDeploymentMode.NATIVE);
+        configuration =
+                new FlinkConfigBuilder(dep, new Configuration())
+                        .applyFlinkConfiguration()
+                        .applyTaskManagerSpec()
+                        .applyJobOrSessionSpec()
+                        .build();
+        assertEquals(5, configuration.get(CoreOptions.DEFAULT_PARALLELISM));
+    }
+
     @Test
     public void testBuildFrom() throws Exception {
         final Configuration configuration =

Reply via email to