This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0dd9bcb0a9c8d905e3907dbd17947926f2432a31 Author: David Moravek <d...@apache.org> AuthorDate: Mon Feb 27 19:03:31 2023 +0100 [FLINK-31399] AdaptiveScheduler should free excessive slots that are no longer needed after down scaling. Signed-off-by: David Moravek <d...@apache.org> --- .../scheduler/adaptive/AdaptiveScheduler.java | 36 +++++++++++ .../scheduler/adaptive/AdaptiveSchedulerTest.java | 71 ++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 436e53b1851..77a992f865a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -224,6 +224,8 @@ public class AdaptiveScheduler private final JobManagerJobMetricGroup jobManagerJobMetricGroup; + private final Duration slotIdleTimeout; + public AdaptiveScheduler( JobGraph jobGraph, @Nullable JobResourceRequirements jobResourceRequirements, @@ -315,6 +317,8 @@ public class AdaptiveScheduler new BoundedFIFOQueue<>( configuration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE)); this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; + this.slotIdleTimeout = + Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)); } private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException { @@ -445,6 +449,7 @@ public class AdaptiveScheduler @Override public void startScheduling() { + checkIdleSlotTimeout(); state.as(Created.class) .orElseThrow( () -> @@ -1256,4 +1261,35 @@ public class AdaptiveScheduler State getState() { return state; } + + /** + * Check for slots that are idle for more than {@link JobManagerOptions#SLOT_IDLE_TIMEOUT} and + * release them back to the ResourceManager. + */ + private void checkIdleSlotTimeout() { + if (getState().getJobStatus().isGloballyTerminalState()) { + // Job has reached the terminal state, so we can return all slots to the ResourceManager + // to speed things up because we no longer need them. This optimization lets us skip + // waiting for the slot pool service to close. + for (SlotInfo slotInfo : declarativeSlotPool.getAllSlotsInformation()) { + declarativeSlotPool.releaseSlot( + slotInfo.getAllocationId(), + new FlinkException( + "Returning slots to their owners, because the job has reached a globally terminal state.")); + } + return; + } else if (getState().getJobStatus().isTerminalState()) { + // do nothing + // prevent idleness check running again while scheduler was already shut down + // don't release slots because JobMaster may want to hold on to slots in case + // it re-acquires leadership + return; + } + declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis()); + getMainThreadExecutor() + .schedule( + this::checkIdleSlotTimeout, + slotIdleTimeout.toMillis(), + TimeUnit.MILLISECONDS); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 6dcf71b3032..fdf833dbcc2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -102,6 +102,7 @@ import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.FlinkException; import org.apache.flink.util.IterableUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.jackson.JacksonMapperFactory; import org.junit.jupiter.api.Test; @@ -1618,6 +1619,76 @@ public class AdaptiveSchedulerTest { } } + @Test + void testIdleSlotsAreReleasedAfterDownScalingTriggeredByLoweredResourceRequirements() + throws Exception { + final JobGraph jobGraph = createJobGraph(); + final Duration slotIdleTimeout = Duration.ofMillis(10); + + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, slotIdleTimeout.toMillis()); + + final DeclarativeSlotPool declarativeSlotPool = + createDeclarativeSlotPool(jobGraph.getJobID(), slotIdleTimeout); + final AdaptiveScheduler scheduler = + new AdaptiveSchedulerBuilder(jobGraph, singleThreadMainThreadExecutor) + .setDeclarativeSlotPool(declarativeSlotPool) + .setJobMasterConfiguration(configuration) + .build(EXECUTOR_RESOURCE.getExecutor()); + + try { + final int numInitialSlots = 4; + final int numSlotsAfterDownscaling = 2; + + final SubmissionBufferingTaskManagerGateway taskManagerGateway = + new SubmissionBufferingTaskManagerGateway(numInitialSlots); + + taskManagerGateway.setCancelConsumer(createCancelConsumer(scheduler)); + + singleThreadMainThreadExecutor.execute( + () -> { + scheduler.startScheduling(); + offerSlots( + declarativeSlotPool, + createSlotOffersForResourceRequirements( + ResourceCounter.withResource( + ResourceProfile.UNKNOWN, numInitialSlots)), + taskManagerGateway); + }); + + // wait for all tasks to be submitted + taskManagerGateway.waitForSubmissions(numInitialSlots); + + // lower the resource requirements + singleThreadMainThreadExecutor.execute( + () -> + scheduler.updateJobResourceRequirements( + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex( + JOB_VERTEX.getID(), 1, numSlotsAfterDownscaling) + .build())); + + // job should be resubmitted with lower parallelism + taskManagerGateway.waitForSubmissions(numSlotsAfterDownscaling); + + // and excessive slots should be freed + taskManagerGateway.waitForFreedSlots(numInitialSlots - numSlotsAfterDownscaling); + + final CompletableFuture<JobStatus> jobStatusFuture = new CompletableFuture<>(); + singleThreadMainThreadExecutor.execute( + () -> jobStatusFuture.complete(scheduler.getState().getJobStatus())); + assertThatFuture(jobStatusFuture).eventuallySucceeds().isEqualTo(JobStatus.RUNNING); + + // make sure we haven't freed up any more slots + assertThat(taskManagerGateway.freedSlots).isEmpty(); + } finally { + final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + singleThreadMainThreadExecutor.execute( + () -> FutureUtils.forward(scheduler.closeAsync(), closeFuture)); + assertThatFuture(closeFuture).eventuallySucceeds(); + } + } + @Test public void testUpdateResourceRequirementsInReactiveModeIsNotSupported() throws Exception { final Configuration configuration = new Configuration();