This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fd4e52ba4d6 [FLINK-27409][runtime] Cleanup stale slot allocation 
record when the resource requirement of a job is empty
fd4e52ba4d6 is described below

commit fd4e52ba4d6292c02c8b5192a8679c1bb666a218
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Tue Apr 26 14:34:35 2022 +0800

    [FLINK-27409][runtime] Cleanup stale slot allocation record when the 
resource requirement of a job is empty
    
    This closes #19580.
---
 .../slotmanager/FineGrainedSlotManager.java        |  14 +--
 .../slotmanager/FineGrainedTaskManagerTracker.java |   6 ++
 .../slotmanager/TaskManagerTracker.java            |   7 ++
 .../AbstractFineGrainedSlotManagerITCase.java      | 107 +++++++++++++++++++++
 4 files changed, 128 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
index f3d0b3fcb79..289cc54d162 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java
@@ -255,6 +255,7 @@ public class FineGrainedSlotManager implements SlotManager {
     @Override
     public void clearResourceRequirements(JobID jobId) {
         jobMasterTargetAddresses.remove(jobId);
+        taskManagerTracker.clearPendingAllocationsOfJob(jobId);
         resourceTracker.notifyResourceRequirements(jobId, 
Collections.emptyList());
     }
 
@@ -263,22 +264,23 @@ public class FineGrainedSlotManager implements 
SlotManager {
         checkInit();
         if (resourceRequirements.getResourceRequirements().isEmpty()
                 && 
resourceTracker.isRequirementEmpty(resourceRequirements.getJobId())) {
+            // Skip duplicate empty resource requirements.
             return;
-        } else if (resourceRequirements.getResourceRequirements().isEmpty()) {
+        }
+
+        if (resourceRequirements.getResourceRequirements().isEmpty()) {
             LOG.info("Clearing resource requirements of job {}", 
resourceRequirements.getJobId());
+            jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
+            
taskManagerTracker.clearPendingAllocationsOfJob(resourceRequirements.getJobId());
         } else {
             LOG.info(
                     "Received resource requirements from job {}: {}",
                     resourceRequirements.getJobId(),
                     resourceRequirements.getResourceRequirements());
-        }
-
-        if (resourceRequirements.getResourceRequirements().isEmpty()) {
-            jobMasterTargetAddresses.remove(resourceRequirements.getJobId());
-        } else {
             jobMasterTargetAddresses.put(
                     resourceRequirements.getJobId(), 
resourceRequirements.getTargetAddress());
         }
+
         resourceTracker.notifyResourceRequirements(
                 resourceRequirements.getJobId(), 
resourceRequirements.getResourceRequirements());
         checkResourceRequirementsWithDelay();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
index 35dd6dc341b..5bb4eaf5cc9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java
@@ -79,6 +79,12 @@ public class FineGrainedTaskManagerTracker implements 
TaskManagerTracker {
         pendingSlotAllocationRecords.putAll(pendingSlotAllocations);
     }
 
+    @Override
+    public void clearPendingAllocationsOfJob(JobID jobId) {
+        LOG.info("Clear all pending allocations for job {}.", jobId);
+        pendingSlotAllocationRecords.values().forEach(allocation -> 
allocation.remove(jobId));
+    }
+
     @Override
     public void addTaskManager(
             TaskExecutorConnection taskExecutorConnection,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
index 88852c22643..ee8e3d14467 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerTracker.java
@@ -97,6 +97,13 @@ interface TaskManagerTracker
     void replaceAllPendingAllocations(
             Map<PendingTaskManagerId, Map<JobID, ResourceCounter>> 
pendingSlotAllocations);
 
+    /**
+     * Clear all previous pending slot allocation records for the given job.
+     *
+     * @param jobId of the given job
+     */
+    void clearPendingAllocationsOfJob(JobID jobId);
+
     /** Removes all state from the tracker. */
     void clear();
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
index 6fab2b13b89..466e521905d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/AbstractFineGrainedSlotManagerITCase.java
@@ -383,6 +383,113 @@ public abstract class 
AbstractFineGrainedSlotManagerITCase extends FineGrainedSl
         };
     }
 
+    @Test
+    public void testRegisterPendingResourceAfterClearingRequirement() throws 
Exception {
+        final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> allocateResourceFutures = new 
CompletableFuture<>();
+        final CompletableFuture<Void> registerFuture = new 
CompletableFuture<>();
+        final ResourceRequirements resourceRequirements = 
createResourceRequirementsForSingleSlot();
+        final TaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture.complete(tuple6.f2);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final ResourceID resourceID = ResourceID.generate();
+        final TaskExecutorConnection taskManagerConnection =
+                new TaskExecutorConnection(resourceID, taskExecutorGateway);
+        final SlotReport slotReport = new SlotReport();
+        new Context() {
+            {
+                resourceActionsBuilder.setAllocateResourceConsumer(
+                        ignored -> allocateResourceFutures.complete(null));
+                runTest(
+                        () -> {
+                            runInMainThread(
+                                    () ->
+                                            getSlotManager()
+                                                    
.processResourceRequirements(
+                                                            
resourceRequirements));
+                            
assertFutureCompleteAndReturn(allocateResourceFutures);
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager()
+                                                .clearResourceRequirements(
+                                                        
resourceRequirements.getJobId());
+                                        getSlotManager()
+                                                .registerTaskManager(
+                                                        taskManagerConnection,
+                                                        slotReport,
+                                                        
DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                        
DEFAULT_SLOT_RESOURCE_PROFILE);
+                                        registerFuture.complete(null);
+                                    });
+                            assertFutureCompleteAndReturn(registerFuture);
+                            assertFutureNotComplete(allocationIdFuture);
+                            assertEquals(
+                                    
getTaskManagerTracker().getPendingTaskManagers().size(), 0);
+                        });
+            }
+        };
+    }
+
+    @Test
+    public void testRegisterPendingResourceAfterEmptyResourceRequirement() 
throws Exception {
+        final CompletableFuture<AllocationID> allocationIdFuture = new 
CompletableFuture<>();
+        final CompletableFuture<Void> allocateResourceFutures = new 
CompletableFuture<>();
+        final CompletableFuture<Void> registerFuture = new 
CompletableFuture<>();
+        final ResourceRequirements resourceRequirements = 
createResourceRequirementsForSingleSlot();
+        final TaskExecutorGateway taskExecutorGateway =
+                new TestingTaskExecutorGatewayBuilder()
+                        .setRequestSlotFunction(
+                                tuple6 -> {
+                                    allocationIdFuture.complete(tuple6.f2);
+                                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                })
+                        .createTestingTaskExecutorGateway();
+        final ResourceID resourceID = ResourceID.generate();
+        final TaskExecutorConnection taskManagerConnection =
+                new TaskExecutorConnection(resourceID, taskExecutorGateway);
+        final SlotReport slotReport = new SlotReport();
+        new Context() {
+            {
+                resourceActionsBuilder.setAllocateResourceConsumer(
+                        ignored -> allocateResourceFutures.complete(null));
+                runTest(
+                        () -> {
+                            runInMainThread(
+                                    () ->
+                                            getSlotManager()
+                                                    
.processResourceRequirements(
+                                                            
resourceRequirements));
+                            
assertFutureCompleteAndReturn(allocateResourceFutures);
+                            runInMainThread(
+                                    () -> {
+                                        getSlotManager()
+                                                .processResourceRequirements(
+                                                        
ResourceRequirements.empty(
+                                                                
resourceRequirements.getJobId(),
+                                                                
resourceRequirements
+                                                                        
.getTargetAddress()));
+                                        getSlotManager()
+                                                .registerTaskManager(
+                                                        taskManagerConnection,
+                                                        slotReport,
+                                                        
DEFAULT_TOTAL_RESOURCE_PROFILE,
+                                                        
DEFAULT_SLOT_RESOURCE_PROFILE);
+                                        registerFuture.complete(null);
+                                    });
+                            assertFutureCompleteAndReturn(registerFuture);
+                            assertFutureNotComplete(allocationIdFuture);
+                            assertEquals(
+                                    
getTaskManagerTracker().getPendingTaskManagers().size(), 0);
+                        });
+            }
+        };
+    }
+
     /**
      * Tests that we only request new resources/containers once we have 
assigned all pending task
      * managers.

Reply via email to