This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fd4e52ba4d6 [FLINK-27409][runtime] Cleanup stale slot allocation record when the resource requirement of a job is empty fd4e52ba4d6 is described below commit fd4e52ba4d6292c02c8b5192a8679c1bb666a218 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Tue Apr 26 14:34:35 2022 +0800 [FLINK-27409][runtime] Cleanup stale slot allocation record when the resource requirement of a job is empty This closes #19580. --- .../slotmanager/FineGrainedSlotManager.java | 14 +-- .../slotmanager/FineGrainedTaskManagerTracker.java | 6 ++ .../slotmanager/TaskManagerTracker.java | 7 ++ .../AbstractFineGrainedSlotManagerITCase.java | 107 +++++++++++++++++++++ 4 files changed, 128 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java index f3d0b3fcb79..289cc54d162 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java @@ -255,6 +255,7 @@ public class FineGrainedSlotManager implements SlotManager { @Override public void clearResourceRequirements(JobID jobId) { jobMasterTargetAddresses.remove(jobId); + taskManagerTracker.clearPendingAllocationsOfJob(jobId); resourceTracker.notifyResourceRequirements(jobId, Collections.emptyList()); } @@ -263,22 +264,23 @@ public class FineGrainedSlotManager implements SlotManager { checkInit(); if (resourceRequirements.getResourceRequirements().isEmpty() && resourceTracker.isRequirementEmpty(resourceRequirements.getJobId())) { + // Skip duplicate empty resource requirements. return; - } else if (resourceRequirements.getResourceRequirements().isEmpty()) { + } + + if (resourceRequirements.getResourceRequirements().isEmpty()) { LOG.info("Clearing resource requirements of job {}", resourceRequirements.getJobId()); + jobMasterTargetAddresses.remove(resourceRequirements.getJobId()); + taskManagerTracker.clearPendingAllocationsOfJob(resourceRequirements.getJobId()); } else { LOG.info( "Received resource requirements from job {}: {}", resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements()); - } - - if (resourceRequirements.getResourceRequirements().isEmpty()) { - jobMasterTargetAddresses.remove(resourceRequirements.getJobId()); - } else { jobMasterTargetAddresses.put( resourceRequirements.getJobId(), resourceRequirements.getTargetAddress()); } + resourceTracker.notifyResourceRequirements( resourceRequirements.getJobId(), resourceRequirements.getResourceRequirements()); checkResourceRequirementsWithDelay(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java index 35dd6dc341b..5bb4eaf5cc9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java @@ -79,6 +79,12 @@ public class FineGrainedTaskManagerTracker implements TaskManagerTracker { pendingSlotAllocationRecords.putAll(pendingSlotAllocations); } + @Override + public void clearPendingAllocationsOfJob(JobID jobId) { + LOG.info("Clear all pending allocations for job {}.", jobId); + pendingSlotAllocationRecords.values().forEach(allocation -> allocation.remove(jobId)); + } + @Override public void addTaskManager( TaskExecutorConnection taskExecutorConnection, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java index 88852c22643..ee8e3d14467 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java @@ -97,6 +97,13 @@ interface TaskManagerTracker void replaceAllPendingAllocations( Map<PendingTaskManagerId, Map<JobID, ResourceCounter>> pendingSlotAllocations); + /** + * Clear all previous pending slot allocation records for the given job. + * + * @param jobId of the given job + */ + void clearPendingAllocationsOfJob(JobID jobId); + /** Removes all state from the tracker. */ void clear(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java index 6fab2b13b89..466e521905d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java @@ -383,6 +383,113 @@ public abstract class AbstractFineGrainedSlotManagerITCase extends FineGrainedSl }; } + @Test + public void testRegisterPendingResourceAfterClearingRequirement() throws Exception { + final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); + final CompletableFuture<Void> allocateResourceFutures = new CompletableFuture<>(); + final CompletableFuture<Void> registerFuture = new CompletableFuture<>(); + final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot(); + final TaskExecutorGateway taskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction( + tuple6 -> { + allocationIdFuture.complete(tuple6.f2); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + final ResourceID resourceID = ResourceID.generate(); + final TaskExecutorConnection taskManagerConnection = + new TaskExecutorConnection(resourceID, taskExecutorGateway); + final SlotReport slotReport = new SlotReport(); + new Context() { + { + resourceActionsBuilder.setAllocateResourceConsumer( + ignored -> allocateResourceFutures.complete(null)); + runTest( + () -> { + runInMainThread( + () -> + getSlotManager() + .processResourceRequirements( + resourceRequirements)); + assertFutureCompleteAndReturn(allocateResourceFutures); + runInMainThread( + () -> { + getSlotManager() + .clearResourceRequirements( + resourceRequirements.getJobId()); + getSlotManager() + .registerTaskManager( + taskManagerConnection, + slotReport, + DEFAULT_TOTAL_RESOURCE_PROFILE, + DEFAULT_SLOT_RESOURCE_PROFILE); + registerFuture.complete(null); + }); + assertFutureCompleteAndReturn(registerFuture); + assertFutureNotComplete(allocationIdFuture); + assertEquals( + getTaskManagerTracker().getPendingTaskManagers().size(), 0); + }); + } + }; + } + + @Test + public void testRegisterPendingResourceAfterEmptyResourceRequirement() throws Exception { + final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); + final CompletableFuture<Void> allocateResourceFutures = new CompletableFuture<>(); + final CompletableFuture<Void> registerFuture = new CompletableFuture<>(); + final ResourceRequirements resourceRequirements = createResourceRequirementsForSingleSlot(); + final TaskExecutorGateway taskExecutorGateway = + new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction( + tuple6 -> { + allocationIdFuture.complete(tuple6.f2); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + final ResourceID resourceID = ResourceID.generate(); + final TaskExecutorConnection taskManagerConnection = + new TaskExecutorConnection(resourceID, taskExecutorGateway); + final SlotReport slotReport = new SlotReport(); + new Context() { + { + resourceActionsBuilder.setAllocateResourceConsumer( + ignored -> allocateResourceFutures.complete(null)); + runTest( + () -> { + runInMainThread( + () -> + getSlotManager() + .processResourceRequirements( + resourceRequirements)); + assertFutureCompleteAndReturn(allocateResourceFutures); + runInMainThread( + () -> { + getSlotManager() + .processResourceRequirements( + ResourceRequirements.empty( + resourceRequirements.getJobId(), + resourceRequirements + .getTargetAddress())); + getSlotManager() + .registerTaskManager( + taskManagerConnection, + slotReport, + DEFAULT_TOTAL_RESOURCE_PROFILE, + DEFAULT_SLOT_RESOURCE_PROFILE); + registerFuture.complete(null); + }); + assertFutureCompleteAndReturn(registerFuture); + assertFutureNotComplete(allocationIdFuture); + assertEquals( + getTaskManagerTracker().getPendingTaskManagers().size(), 0); + }); + } + }; + } + /** * Tests that we only request new resources/containers once we have assigned all pending task * managers.