This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7e16a0fa61177a51cb50d0f198464939310508a0 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Fri Mar 31 09:52:14 2023 +0200 [FLINK-31672] Respect user-specified max parallelism --- .../apache/flink/runtime/jobmaster/JobMaster.java | 5 ++- .../flink/runtime/jobmaster/JobMasterTest.java | 37 ++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index b5330e3cd3d..250a325fb33 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -945,7 +945,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> final Map<JobVertexID, Integer> maxParallelismPerVertex = new HashMap<>(); for (JobVertex vertex : jobGraph.getVertices()) { maxParallelismPerVertex.put( - vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex)); + vertex.getID(), + vertex.getMaxParallelism() != JobVertex.MAX_PARALLELISM_DEFAULT + ? vertex.getMaxParallelism() + : SchedulerBase.getDefaultMaxParallelism(vertex)); } return CompletableFuture.completedFuture(maxParallelismPerVertex); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index f42a9b7e7db..81fb1088408 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -98,6 +98,7 @@ import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingSchedulerNG; import org.apache.flink.runtime.scheduler.TestingSchedulerNGFactory; @@ -2038,6 +2039,42 @@ class JobMasterTest { } } + @Test + public void testGetMaxParallelismPerVertexRespectsUserSpecifiedParallelism() throws Exception { + JobVertex vertexWithoutMaxParallelism = new JobVertex("vertex1"); + vertexWithoutMaxParallelism.setInvokableClass(NoOpInvokable.class); + vertexWithoutMaxParallelism.setParallelism(1); + JobVertex vertexWithMaxParallelism = new JobVertex("vertex2"); + vertexWithMaxParallelism.setInvokableClass(NoOpInvokable.class); + vertexWithMaxParallelism.setParallelism(1); + vertexWithMaxParallelism.setMaxParallelism(4000); + final JobGraph jobGraph = + JobGraphTestUtils.streamingJobGraph( + vertexWithoutMaxParallelism, vertexWithMaxParallelism); + + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withConfiguration(configuration) + .createJobMaster()) { + jobMaster.start(); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); + + assertThatFuture(jobMasterGateway.getMaxParallelismPerVertex()) + .eventuallySucceeds() + .satisfies( + maxParallelism -> + assertThat(maxParallelism) + .containsEntry( + vertexWithMaxParallelism.getID(), + vertexWithMaxParallelism.getMaxParallelism()) + .containsEntry( + vertexWithoutMaxParallelism.getID(), + SchedulerBase.getDefaultMaxParallelism( + vertexWithoutMaxParallelism))); + } + } + @Test public void testSuccessfulResourceRequirementsUpdate() throws Exception { final CompletableFuture<JobResourceRequirements> schedulerUpdateFuture =