This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7e16a0fa61177a51cb50d0f198464939310508a0
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Fri Mar 31 09:52:14 2023 +0200

    [FLINK-31672] Respect user-specified max parallelism
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  5 ++-
 .../flink/runtime/jobmaster/JobMasterTest.java     | 37 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b5330e3cd3d..250a325fb33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -945,7 +945,10 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         final Map<JobVertexID, Integer> maxParallelismPerVertex = new 
HashMap<>();
         for (JobVertex vertex : jobGraph.getVertices()) {
             maxParallelismPerVertex.put(
-                    vertex.getID(), 
SchedulerBase.getDefaultMaxParallelism(vertex));
+                    vertex.getID(),
+                    vertex.getMaxParallelism() != 
JobVertex.MAX_PARALLELISM_DEFAULT
+                            ? vertex.getMaxParallelism()
+                            : SchedulerBase.getDefaultMaxParallelism(vertex));
         }
         return CompletableFuture.completedFuture(maxParallelismPerVertex);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index f42a9b7e7db..81fb1088408 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -98,6 +98,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
 import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
 import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory;
@@ -2038,6 +2039,42 @@ class JobMasterTest {
         }
     }
 
+    @Test
+    public void 
testGetMaxParallelismPerVertexRespectsUserSpecifiedParallelism() throws 
Exception {
+        JobVertex vertexWithoutMaxParallelism = new JobVertex("vertex1");
+        vertexWithoutMaxParallelism.setInvokableClass(NoOpInvokable.class);
+        vertexWithoutMaxParallelism.setParallelism(1);
+        JobVertex vertexWithMaxParallelism = new JobVertex("vertex2");
+        vertexWithMaxParallelism.setInvokableClass(NoOpInvokable.class);
+        vertexWithMaxParallelism.setParallelism(1);
+        vertexWithMaxParallelism.setMaxParallelism(4000);
+        final JobGraph jobGraph =
+                JobGraphTestUtils.streamingJobGraph(
+                        vertexWithoutMaxParallelism, vertexWithMaxParallelism);
+
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withConfiguration(configuration)
+                        .createJobMaster()) {
+            jobMaster.start();
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
+
+            assertThatFuture(jobMasterGateway.getMaxParallelismPerVertex())
+                    .eventuallySucceeds()
+                    .satisfies(
+                            maxParallelism ->
+                                    assertThat(maxParallelism)
+                                            .containsEntry(
+                                                    
vertexWithMaxParallelism.getID(),
+                                                    
vertexWithMaxParallelism.getMaxParallelism())
+                                            .containsEntry(
+                                                    
vertexWithoutMaxParallelism.getID(),
+                                                    
SchedulerBase.getDefaultMaxParallelism(
+                                                            
vertexWithoutMaxParallelism)));
+        }
+    }
+
     @Test
     public void testSuccessfulResourceRequirementsUpdate() throws Exception {
         final CompletableFuture<JobResourceRequirements> schedulerUpdateFuture 
=

Reply via email to