This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 678789afb42 [FLINK-31590] Add mechanics to the JobMaster for handling 
job resource requirements
678789afb42 is described below

commit 678789afb4277e20694c70cba8261bd21555dc48
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Wed Mar 29 11:15:19 2023 +0200

    [FLINK-31590] Add mechanics to the JobMaster for handling job resource 
requirements
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 35 ++++++++
 .../flink/runtime/jobmaster/JobMasterGateway.java  | 17 ++++
 .../flink/runtime/jobmaster/JobMasterTest.java     | 95 ++++++++++++++++++++++
 .../jobmaster/utils/TestingJobMasterGateway.java   | 27 +++++-
 .../utils/TestingJobMasterGatewayBuilder.java      | 27 +++++-
 5 files changed, 199 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 318530b8b8c..3c1bfd77e05 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,6 +53,8 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -79,11 +81,13 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
 import org.apache.flink.runtime.shuffle.JobShuffleContext;
 import org.apache.flink.runtime.shuffle.JobShuffleContextImpl;
@@ -104,6 +108,8 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
 
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
 import org.slf4j.Logger;
 
 import javax.annotation.Nonnull;
@@ -114,6 +120,7 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -937,6 +944,34 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
         return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> 
requestJobResourceRequirements() {
+        return 
CompletableFuture.completedFuture(schedulerNG.requestJobResourceRequirements());
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        final Map<JobVertexID, Integer> maxParallelismPerJobVertex = new 
HashMap<>();
+        for (JobVertex vertex : jobGraph.getVertices()) {
+            maxParallelismPerJobVertex.put(
+                    vertex.getID(), 
SchedulerBase.getDefaultMaxParallelism(vertex));
+        }
+        final List<String> validationErrors =
+                JobResourceRequirements.validate(
+                        jobResourceRequirements, maxParallelismPerJobVertex);
+        if (validationErrors.isEmpty()) {
+            schedulerNG.updateJobResourceRequirements(jobResourceRequirements);
+            return CompletableFuture.completedFuture(Acknowledge.get());
+        } else {
+            return FutureUtils.completedExceptionally(
+                    new RestHandlerException(
+                            validationErrors.stream()
+                                    
.collect(Collectors.joining(System.lineSeparator())),
+                            HttpResponseStatus.BAD_REQUEST));
+        }
+    }
+
     // 
----------------------------------------------------------------------------------------------
     // Internal methods
     // 
----------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 3c269f3f6e7..b1a87e6296c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -299,4 +300,20 @@ public interface JobMasterGateway
      */
     CompletableFuture<?> stopTrackingAndReleasePartitions(
             Collection<ResultPartitionID> partitionIds);
+
+    /**
+     * Read current {@link JobResourceRequirements job resource requirements}.
+     *
+     * @return Future which that contains current resource requirements.
+     */
+    CompletableFuture<JobResourceRequirements> 
requestJobResourceRequirements();
+
+    /**
+     * Update {@link JobResourceRequirements job resource requirements}.
+     *
+     * @param jobResourceRequirements new resource requirements
+     * @return Future which is completed successfully when requirements are 
updated
+     */
+    CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 8abc3c68179..b5dc211932c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -72,6 +72,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -92,6 +93,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
@@ -117,6 +119,7 @@ import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.jupiter.api.AfterAll;
@@ -125,6 +128,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import javax.annotation.Nonnull;
@@ -161,11 +165,13 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link JobMaster}. */
+@ExtendWith(TestLoggerExtension.class)
 class JobMasterTest {
 
     private static final TestingInputSplit[] EMPTY_TESTING_INPUT_SPLITS = new 
TestingInputSplit[0];
@@ -2033,6 +2039,95 @@ class JobMasterTest {
         }
     }
 
+    @Test
+    public void testSuccessfulResourceRequirementsUpdate() throws Exception {
+        final CompletableFuture<JobResourceRequirements> schedulerUpdateFuture 
=
+                new CompletableFuture<>();
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        
.setUpdateJobResourceRequirementsConsumer(schedulerUpdateFuture::complete)
+                        .build();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withConfiguration(configuration)
+                        .withHighAvailabilityServices(haServices)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder(),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster()) {
+            jobMaster.start();
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
+
+            final JobResourceRequirements.Builder 
jobResourceRequirementsBuilder =
+                    JobResourceRequirements.newBuilder();
+            for (JobVertex jobVertex : jobGraph.getVertices()) {
+                
jobResourceRequirementsBuilder.setParallelismForJobVertex(jobVertex.getID(), 1, 
2);
+            }
+
+            final JobResourceRequirements newRequirements = 
jobResourceRequirementsBuilder.build();
+            final CompletableFuture<Acknowledge> jobMasterUpdateFuture =
+                    
jobMasterGateway.updateJobResourceRequirements(newRequirements);
+
+            assertThatFuture(jobMasterUpdateFuture).eventuallySucceeds();
+            
assertThatFuture(schedulerUpdateFuture).eventuallySucceeds().isEqualTo(newRequirements);
+        }
+    }
+
+    @Test
+    public void testInvalidResourceRequirementsUpdate() throws Exception {
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setUpdateJobResourceRequirementsConsumer(
+                                jobResourceRequirements -> {
+                                    // No-op.
+                                })
+                        .build();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withConfiguration(configuration)
+                        .withHighAvailabilityServices(haServices)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder(),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster()) {
+            jobMaster.start();
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
+
+            assertThatFuture(
+                            jobMasterGateway.updateJobResourceRequirements(
+                                    JobResourceRequirements.empty()))
+                    .eventuallyFailsWith(ExecutionException.class)
+                    .withCauseInstanceOf(RestHandlerException.class);
+        }
+    }
+
+    @Test
+    public void testResourceRequirementsAreRequestedFromTheScheduler() throws 
Exception {
+        final JobResourceRequirements jobResourceRequirements = 
JobResourceRequirements.empty();
+        final TestingSchedulerNG scheduler =
+                TestingSchedulerNG.newBuilder()
+                        .setRequestJobResourceRequirementsSupplier(() -> 
jobResourceRequirements)
+                        .build();
+        try (final JobMaster jobMaster =
+                new JobMasterBuilder(jobGraph, rpcService)
+                        .withSlotPoolServiceSchedulerFactory(
+                                DefaultSlotPoolServiceSchedulerFactory.create(
+                                        
TestingSlotPoolServiceBuilder.newBuilder(),
+                                        new 
TestingSchedulerNGFactory(scheduler)))
+                        .createJobMaster()) {
+            jobMaster.start();
+            final JobMasterGateway jobMasterGateway =
+                    jobMaster.getSelfGateway(JobMasterGateway.class);
+            assertThatFuture(jobMasterGateway.requestJobResourceRequirements())
+                    .eventuallySucceeds()
+                    .isEqualTo(jobResourceRequirements);
+        }
+    }
+
     private TestingResourceManagerGateway createResourceManagerGateway(
             CompletableFuture<Collection<BlockedNode>> 
firstReceivedBlockedNodeFuture,
             CompletableFuture<Collection<BlockedNode>> 
secondReceivedBlockedNodeFuture,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
index 37336952d98..b87ee2d488d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGateway.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -189,6 +190,11 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
     private final Function<Collection<BlockedNode>, 
CompletableFuture<Acknowledge>>
             notifyNewBlockedNodesFunction;
 
+    private final Supplier<CompletableFuture<JobResourceRequirements>>
+            requestJobResourceRequirementsSupplier;
+    private final Function<JobResourceRequirements, 
CompletableFuture<Acknowledge>>
+            updateJobResourceRequirementsFunction;
+
     public TestingJobMasterGateway(
             @Nonnull String address,
             @Nonnull String hostname,
@@ -292,7 +298,13 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
             @Nonnull Consumer<Collection<ResourceRequirement>> 
notifyNotEnoughResourcesConsumer,
             @Nonnull
                     Function<Collection<BlockedNode>, 
CompletableFuture<Acknowledge>>
-                            notifyNewBlockedNodesFunction) {
+                            notifyNewBlockedNodesFunction,
+            @Nonnull
+                    Supplier<CompletableFuture<JobResourceRequirements>>
+                            requestJobResourceRequirementsSupplier,
+            @Nonnull
+                    Function<JobResourceRequirements, 
CompletableFuture<Acknowledge>>
+                            updateJobResourceRequirementsFunction) {
         this.address = address;
         this.hostname = hostname;
         this.cancelFunction = cancelFunction;
@@ -322,6 +334,8 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
         this.deliverCoordinationRequestFunction = 
deliverCoordinationRequestFunction;
         this.notifyNotEnoughResourcesConsumer = 
notifyNotEnoughResourcesConsumer;
         this.notifyNewBlockedNodesFunction = notifyNewBlockedNodesFunction;
+        this.requestJobResourceRequirementsSupplier = 
requestJobResourceRequirementsSupplier;
+        this.updateJobResourceRequirementsFunction = 
updateJobResourceRequirementsFunction;
     }
 
     @Override
@@ -546,4 +560,15 @@ public class TestingJobMasterGateway implements 
JobMasterGateway {
     public CompletableFuture<Acknowledge> 
notifyNewBlockedNodes(Collection<BlockedNode> newNodes) {
         return notifyNewBlockedNodesFunction.apply(newNodes);
     }
+
+    @Override
+    public CompletableFuture<JobResourceRequirements> 
requestJobResourceRequirements() {
+        return requestJobResourceRequirementsSupplier.get();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobResourceRequirements jobResourceRequirements) {
+        return 
updateJobResourceRequirementsFunction.apply(jobResourceRequirements);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
index d680b00d8a2..8293a77014d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/utils/TestingJobMasterGatewayBuilder.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -176,6 +177,14 @@ public class TestingJobMasterGatewayBuilder {
             notifyNewBlockedNodesFunction =
                     ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
 
+    private Supplier<CompletableFuture<JobResourceRequirements>>
+            requestJobResourceRequirementsSupplier =
+                    () -> 
CompletableFuture.completedFuture(JobResourceRequirements.empty());
+
+    private Function<JobResourceRequirements, CompletableFuture<Acknowledge>>
+            updateJobResourceRequirementsFunction =
+                    ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
+
     public TestingJobMasterGatewayBuilder setAddress(String address) {
         this.address = address;
         return this;
@@ -388,6 +397,20 @@ public class TestingJobMasterGatewayBuilder {
         return this;
     }
 
+    public TestingJobMasterGatewayBuilder 
setRequestJobResourceRequirementsSupplier(
+            Supplier<CompletableFuture<JobResourceRequirements>>
+                    requestJobResourceRequirementsSupplier) {
+        this.requestJobResourceRequirementsSupplier = 
requestJobResourceRequirementsSupplier;
+        return this;
+    }
+
+    public TestingJobMasterGatewayBuilder 
setUpdateJobResourceRequirementsFunction(
+            Function<JobResourceRequirements, CompletableFuture<Acknowledge>>
+                    updateJobResourceRequirementsFunction) {
+        this.updateJobResourceRequirementsFunction = 
updateJobResourceRequirementsFunction;
+        return this;
+    }
+
     public TestingJobMasterGateway build() {
         return new TestingJobMasterGateway(
                 address,
@@ -418,6 +441,8 @@ public class TestingJobMasterGatewayBuilder {
                 operatorEventSender,
                 deliverCoordinationRequestFunction,
                 notifyNotEnoughResourcesConsumer,
-                notifyNewBlockedNodesFunction);
+                notifyNewBlockedNodesFunction,
+                requestJobResourceRequirementsSupplier,
+                updateJobResourceRequirementsFunction);
     }
 }

Reply via email to