This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 678789afb42 [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements 678789afb42 is described below commit 678789afb4277e20694c70cba8261bd21555dc48 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Wed Mar 29 11:15:19 2023 +0200 [FLINK-31590] Add mechanics to the JobMaster for handling job resource requirements --- .../apache/flink/runtime/jobmaster/JobMaster.java | 35 ++++++++ .../flink/runtime/jobmaster/JobMasterGateway.java | 17 ++++ .../flink/runtime/jobmaster/JobMasterTest.java | 95 ++++++++++++++++++++++ .../jobmaster/utils/TestingJobMasterGateway.java | 27 +++++- .../utils/TestingJobMasterGatewayBuilder.java | 27 +++++- 5 files changed, 199 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 318530b8b8c..3c1bfd77e05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -53,6 +53,8 @@ import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; @@ -79,11 +81,13 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.shuffle.JobShuffleContext; import org.apache.flink.runtime.shuffle.JobShuffleContextImpl; @@ -104,6 +108,8 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + import org.slf4j.Logger; import javax.annotation.Nonnull; @@ -114,6 +120,7 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -937,6 +944,34 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> return CompletableFuture.completedFuture(Acknowledge.get()); } + @Override + public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() { + return CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements()); + } + + @Override + public CompletableFuture<Acknowledge> updateJobResourceRequirements( + JobResourceRequirements jobResourceRequirements) { + final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new HashMap<>(); + for (JobVertex vertex : jobGraph.getVertices()) { + maxParallelismPerJobVertex.put( + vertex.getID(), SchedulerBase.getDefaultMaxParallelism(vertex)); + } + final List<String> validationErrors = + JobResourceRequirements.validate( + jobResourceRequirements, maxParallelismPerJobVertex); + if (validationErrors.isEmpty()) { + schedulerNG.updateJobResourceRequirements(jobResourceRequirements); + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + return FutureUtils.completedExceptionally( + new RestHandlerException( + validationErrors.stream() + .collect(Collectors.joining(System.lineSeparator())), + HttpResponseStatus.BAD_REQUEST)); + } + } + // ---------------------------------------------------------------------------------------------- // Internal methods // ---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 3c269f3f6e7..b1a87e6296c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.Acknowledge; @@ -299,4 +300,20 @@ public interface JobMasterGateway */ CompletableFuture<?> stopTrackingAndReleasePartitions( Collection<ResultPartitionID> partitionIds); + + /** + * Read current {@link JobResourceRequirements job resource requirements}. + * + * @return Future which that contains current resource requirements. + */ + CompletableFuture<JobResourceRequirements> requestJobResourceRequirements(); + + /** + * Update {@link JobResourceRequirements job resource requirements}. + * + * @param jobResourceRequirements new resource requirements + * @return Future which is completed successfully when requirements are updated + */ + CompletableFuture<Acknowledge> updateJobResourceRequirements( + JobResourceRequirements jobResourceRequirements); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 8abc3c68179..b5dc211932c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -72,6 +72,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -92,6 +93,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; @@ -117,6 +119,7 @@ import org.apache.flink.testutils.TestingUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.concurrent.FutureUtils; import org.junit.jupiter.api.AfterAll; @@ -125,6 +128,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nonnull; @@ -161,11 +165,13 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link JobMaster}. */ +@ExtendWith(TestLoggerExtension.class) class JobMasterTest { private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new TestingInputSplit[0]; @@ -2033,6 +2039,95 @@ class JobMasterTest { } } + @Test + public void testSuccessfulResourceRequirementsUpdate() throws Exception { + final CompletableFuture<JobResourceRequirements> schedulerUpdateFuture = + new CompletableFuture<>(); + final TestingSchedulerNG scheduler = + TestingSchedulerNG.newBuilder() + .setUpdateJobResourceRequirementsConsumer(schedulerUpdateFuture::complete) + .build(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withConfiguration(configuration) + .withHighAvailabilityServices(haServices) + .withSlotPoolServiceSchedulerFactory( + DefaultSlotPoolServiceSchedulerFactory.create( + TestingSlotPoolServiceBuilder.newBuilder(), + new TestingSchedulerNGFactory(scheduler))) + .createJobMaster()) { + jobMaster.start(); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); + + final JobResourceRequirements.Builder jobResourceRequirementsBuilder = + JobResourceRequirements.newBuilder(); + for (JobVertex jobVertex : jobGraph.getVertices()) { + jobResourceRequirementsBuilder.setParallelismForJobVertex(jobVertex.getID(), 1, 2); + } + + final JobResourceRequirements newRequirements = jobResourceRequirementsBuilder.build(); + final CompletableFuture<Acknowledge> jobMasterUpdateFuture = + jobMasterGateway.updateJobResourceRequirements(newRequirements); + + assertThatFuture(jobMasterUpdateFuture).eventuallySucceeds(); + assertThatFuture(schedulerUpdateFuture).eventuallySucceeds().isEqualTo(newRequirements); + } + } + + @Test + public void testInvalidResourceRequirementsUpdate() throws Exception { + final TestingSchedulerNG scheduler = + TestingSchedulerNG.newBuilder() + .setUpdateJobResourceRequirementsConsumer( + jobResourceRequirements -> { + // No-op. + }) + .build(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withConfiguration(configuration) + .withHighAvailabilityServices(haServices) + .withSlotPoolServiceSchedulerFactory( + DefaultSlotPoolServiceSchedulerFactory.create( + TestingSlotPoolServiceBuilder.newBuilder(), + new TestingSchedulerNGFactory(scheduler))) + .createJobMaster()) { + jobMaster.start(); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); + + assertThatFuture( + jobMasterGateway.updateJobResourceRequirements( + JobResourceRequirements.empty())) + .eventuallyFailsWith(ExecutionException.class) + .withCauseInstanceOf(RestHandlerException.class); + } + } + + @Test + public void testResourceRequirementsAreRequestedFromTheScheduler() throws Exception { + final JobResourceRequirements jobResourceRequirements = JobResourceRequirements.empty(); + final TestingSchedulerNG scheduler = + TestingSchedulerNG.newBuilder() + .setRequestJobResourceRequirementsSupplier(() -> jobResourceRequirements) + .build(); + try (final JobMaster jobMaster = + new JobMasterBuilder(jobGraph, rpcService) + .withSlotPoolServiceSchedulerFactory( + DefaultSlotPoolServiceSchedulerFactory.create( + TestingSlotPoolServiceBuilder.newBuilder(), + new TestingSchedulerNGFactory(scheduler))) + .createJobMaster()) { + jobMaster.start(); + final JobMasterGateway jobMasterGateway = + jobMaster.getSelfGateway(JobMasterGateway.class); + assertThatFuture(jobMasterGateway.requestJobResourceRequirements()) + .eventuallySucceeds() + .isEqualTo(jobResourceRequirements); + } + } + private TestingResourceManagerGateway createResourceManagerGateway( CompletableFuture<Collection<BlockedNode>> firstReceivedBlockedNodeFuture, CompletableFuture<Collection<BlockedNode>> secondReceivedBlockedNodeFuture, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java index 37336952d98..b87ee2d488d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -189,6 +190,11 @@ public class TestingJobMasterGateway implements JobMasterGateway { private final Function<Collection<BlockedNode>, CompletableFuture<Acknowledge>> notifyNewBlockedNodesFunction; + private final Supplier<CompletableFuture<JobResourceRequirements>> + requestJobResourceRequirementsSupplier; + private final Function<JobResourceRequirements, CompletableFuture<Acknowledge>> + updateJobResourceRequirementsFunction; + public TestingJobMasterGateway( @Nonnull String address, @Nonnull String hostname, @@ -292,7 +298,13 @@ public class TestingJobMasterGateway implements JobMasterGateway { @Nonnull Consumer<Collection<ResourceRequirement>> notifyNotEnoughResourcesConsumer, @Nonnull Function<Collection<BlockedNode>, CompletableFuture<Acknowledge>> - notifyNewBlockedNodesFunction) { + notifyNewBlockedNodesFunction, + @Nonnull + Supplier<CompletableFuture<JobResourceRequirements>> + requestJobResourceRequirementsSupplier, + @Nonnull + Function<JobResourceRequirements, CompletableFuture<Acknowledge>> + updateJobResourceRequirementsFunction) { this.address = address; this.hostname = hostname; this.cancelFunction = cancelFunction; @@ -322,6 +334,8 @@ public class TestingJobMasterGateway implements JobMasterGateway { this.deliverCoordinationRequestFunction = deliverCoordinationRequestFunction; this.notifyNotEnoughResourcesConsumer = notifyNotEnoughResourcesConsumer; this.notifyNewBlockedNodesFunction = notifyNewBlockedNodesFunction; + this.requestJobResourceRequirementsSupplier = requestJobResourceRequirementsSupplier; + this.updateJobResourceRequirementsFunction = updateJobResourceRequirementsFunction; } @Override @@ -546,4 +560,15 @@ public class TestingJobMasterGateway implements JobMasterGateway { public CompletableFuture<Acknowledge> notifyNewBlockedNodes(Collection<BlockedNode> newNodes) { return notifyNewBlockedNodesFunction.apply(newNodes); } + + @Override + public CompletableFuture<JobResourceRequirements> requestJobResourceRequirements() { + return requestJobResourceRequirementsSupplier.get(); + } + + @Override + public CompletableFuture<Acknowledge> updateJobResourceRequirements( + JobResourceRequirements jobResourceRequirements) { + return updateJobResourceRequirementsFunction.apply(jobResourceRequirements); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java index d680b00d8a2..8293a77014d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; @@ -176,6 +177,14 @@ public class TestingJobMasterGatewayBuilder { notifyNewBlockedNodesFunction = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); + private Supplier<CompletableFuture<JobResourceRequirements>> + requestJobResourceRequirementsSupplier = + () -> CompletableFuture.completedFuture(JobResourceRequirements.empty()); + + private Function<JobResourceRequirements, CompletableFuture<Acknowledge>> + updateJobResourceRequirementsFunction = + ignored -> CompletableFuture.completedFuture(Acknowledge.get()); + public TestingJobMasterGatewayBuilder setAddress(String address) { this.address = address; return this; @@ -388,6 +397,20 @@ public class TestingJobMasterGatewayBuilder { return this; } + public TestingJobMasterGatewayBuilder setRequestJobResourceRequirementsSupplier( + Supplier<CompletableFuture<JobResourceRequirements>> + requestJobResourceRequirementsSupplier) { + this.requestJobResourceRequirementsSupplier = requestJobResourceRequirementsSupplier; + return this; + } + + public TestingJobMasterGatewayBuilder setUpdateJobResourceRequirementsFunction( + Function<JobResourceRequirements, CompletableFuture<Acknowledge>> + updateJobResourceRequirementsFunction) { + this.updateJobResourceRequirementsFunction = updateJobResourceRequirementsFunction; + return this; + } + public TestingJobMasterGateway build() { return new TestingJobMasterGateway( address, @@ -418,6 +441,8 @@ public class TestingJobMasterGatewayBuilder { operatorEventSender, deliverCoordinationRequestFunction, notifyNotEnoughResourcesConsumer, - notifyNewBlockedNodesFunction); + notifyNewBlockedNodesFunction, + requestJobResourceRequirementsSupplier, + updateJobResourceRequirementsFunction); } }