This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0dd9bcb0a9c8d905e3907dbd17947926f2432a31
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Feb 27 19:03:31 2023 +0100

    [FLINK-31399] AdaptiveScheduler should free excessive slots that are no 
longer needed after down scaling.
    
    Signed-off-by: David Moravek <d...@apache.org>
---
 .../scheduler/adaptive/AdaptiveScheduler.java      | 36 +++++++++++
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 71 ++++++++++++++++++++++
 2 files changed, 107 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 436e53b1851..77a992f865a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -224,6 +224,8 @@ public class AdaptiveScheduler
 
     private final JobManagerJobMetricGroup jobManagerJobMetricGroup;
 
+    private final Duration slotIdleTimeout;
+
     public AdaptiveScheduler(
             JobGraph jobGraph,
             @Nullable JobResourceRequirements jobResourceRequirements,
@@ -315,6 +317,8 @@ public class AdaptiveScheduler
                 new BoundedFIFOQueue<>(
                         
configuration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
         this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
+        this.slotIdleTimeout =
+                
Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT));
     }
 
     private static void assertPreconditions(JobGraph jobGraph) throws 
RuntimeException {
@@ -445,6 +449,7 @@ public class AdaptiveScheduler
 
     @Override
     public void startScheduling() {
+        checkIdleSlotTimeout();
         state.as(Created.class)
                 .orElseThrow(
                         () ->
@@ -1256,4 +1261,35 @@ public class AdaptiveScheduler
     State getState() {
         return state;
     }
+
+    /**
+     * Check for slots that are idle for more than {@link 
JobManagerOptions#SLOT_IDLE_TIMEOUT} and
+     * release them back to the ResourceManager.
+     */
+    private void checkIdleSlotTimeout() {
+        if (getState().getJobStatus().isGloballyTerminalState()) {
+            // Job has reached the terminal state, so we can return all slots 
to the ResourceManager
+            // to speed things up because we no longer need them. This 
optimization lets us skip
+            // waiting for the slot pool service to close.
+            for (SlotInfo slotInfo : 
declarativeSlotPool.getAllSlotsInformation()) {
+                declarativeSlotPool.releaseSlot(
+                        slotInfo.getAllocationId(),
+                        new FlinkException(
+                                "Returning slots to their owners, because the 
job has reached a globally terminal state."));
+            }
+            return;
+        } else if (getState().getJobStatus().isTerminalState()) {
+            // do nothing
+            // prevent idleness check running again while scheduler was 
already shut down
+            // don't release slots because JobMaster may want to hold on to 
slots in case
+            // it re-acquires leadership
+            return;
+        }
+        declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis());
+        getMainThreadExecutor()
+                .schedule(
+                        this::checkIdleSlotTimeout,
+                        slotIdleTimeout.toMillis(),
+                        TimeUnit.MILLISECONDS);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 6dcf71b3032..fdf833dbcc2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -102,6 +102,7 @@ import 
org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.junit.jupiter.api.Test;
@@ -1618,6 +1619,76 @@ public class AdaptiveSchedulerTest {
         }
     }
 
+    @Test
+    void 
testIdleSlotsAreReleasedAfterDownScalingTriggeredByLoweredResourceRequirements()
+            throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final Duration slotIdleTimeout = Duration.ofMillis(10);
+
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, 
slotIdleTimeout.toMillis());
+
+        final DeclarativeSlotPool declarativeSlotPool =
+                createDeclarativeSlotPool(jobGraph.getJobID(), 
slotIdleTimeout);
+        final AdaptiveScheduler scheduler =
+                new AdaptiveSchedulerBuilder(jobGraph, 
singleThreadMainThreadExecutor)
+                        .setDeclarativeSlotPool(declarativeSlotPool)
+                        .setJobMasterConfiguration(configuration)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        try {
+            final int numInitialSlots = 4;
+            final int numSlotsAfterDownscaling = 2;
+
+            final SubmissionBufferingTaskManagerGateway taskManagerGateway =
+                    new SubmissionBufferingTaskManagerGateway(numInitialSlots);
+
+            
taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler));
+
+            singleThreadMainThreadExecutor.execute(
+                    () -> {
+                        scheduler.startScheduling();
+                        offerSlots(
+                                declarativeSlotPool,
+                                createSlotOffersForResourceRequirements(
+                                        ResourceCounter.withResource(
+                                                ResourceProfile.UNKNOWN, 
numInitialSlots)),
+                                taskManagerGateway);
+                    });
+
+            // wait for all tasks to be submitted
+            taskManagerGateway.waitForSubmissions(numInitialSlots);
+
+            // lower the resource requirements
+            singleThreadMainThreadExecutor.execute(
+                    () ->
+                            scheduler.updateJobResourceRequirements(
+                                    JobResourceRequirements.newBuilder()
+                                            .setParallelismForJobVertex(
+                                                    JOB_VERTEX.getID(), 1, 
numSlotsAfterDownscaling)
+                                            .build()));
+
+            // job should be resubmitted with lower parallelism
+            taskManagerGateway.waitForSubmissions(numSlotsAfterDownscaling);
+
+            // and excessive slots should be freed
+            taskManagerGateway.waitForFreedSlots(numInitialSlots - 
numSlotsAfterDownscaling);
+
+            final CompletableFuture<JobStatus> jobStatusFuture = new 
CompletableFuture<>();
+            singleThreadMainThreadExecutor.execute(
+                    () -> 
jobStatusFuture.complete(scheduler.getState().getJobStatus()));
+            
assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo(JobStatus.RUNNING);
+
+            // make sure we haven't freed up any more slots
+            assertThat(taskManagerGateway.freedSlots).isEmpty();
+        } finally {
+            final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+            singleThreadMainThreadExecutor.execute(
+                    () -> FutureUtils.forward(scheduler.closeAsync(), 
closeFuture));
+            assertThatFuture(closeFuture).eventuallySucceeds();
+        }
+    }
+
     @Test
     public void testUpdateResourceRequirementsInReactiveModeIsNotSupported() 
throws Exception {
         final Configuration configuration = new Configuration();

Reply via email to