xintongsong commented on code in PR #21565:
URL: https://github.com/apache/flink/pull/21565#discussion_r1064310906


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -284,7 +284,7 @@ public void close() throws Exception {
     @Override
     public void clearResourceRequirements(JobID jobId) {
         jobMasterTargetAddresses.remove(jobId);
-        taskManagerTracker.clearPendingAllocationsOfJob(jobId);
+        taskManagerTracker.clearPendingAllocationsOfJob(jobId, 
resourceAllocator.isSupported());

Review Comment:
   We should not pass `resourceAllocator.isSupported()` into 
`taskManagerTracker`. Instead, we should not call 
`taskManagerTracker.clearPendingAllocationsOfJob()` if the allocator is not 
supported.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -342,21 +352,53 @@ private void checkResourceDeclarations() {
                                 resourceDeclaration.getUnwantedWorkers(),
                                 releaseOrRequestWorkerNumber);
 
-                // TODO, release pending/starting/running workers to exceed 
declared worker number.
                 if (remainingReleasingWorkerNumber > 0) {
-                    log.debug(
-                            "need release {} workers after release unwanted 
workers.",
-                            remainingReleasingWorkerNumber);
+                    // release not allocated workers;
+                    remainingReleasingWorkerNumber =
+                            releaseUnallocatedWorkers(
+                                    workerResourceSpec, 
remainingReleasingWorkerNumber);
                 }
+
+                if (remainingReleasingWorkerNumber > 0) {
+                    // release starting workers and running workers;
+                    List<ResourceID> workerCanReleaseInStartingOrder = new 
ArrayList<>();
+                    currentAttemptUnregisteredWorkers.stream()
+                            .filter(r -> 
workerResourceSpec.equals(workerResourceSpecs.get(r)))
+                            .forEach(workerCanReleaseInStartingOrder::add);
+                    workerNodeMap.keySet().stream()
+                            .filter(
+                                    r ->
+                                            
workerResourceSpec.equals(workerResourceSpecs.get(r))
+                                                    && 
!currentAttemptUnregisteredWorkers.contains(
+                                                            r))
+                            .forEach(workerCanReleaseInStartingOrder::add);
+
+                    remainingReleasingWorkerNumber =
+                            releaseResources(
+                                    workerCanReleaseInStartingOrder,
+                                    remainingReleasingWorkerNumber);
+                }

Review Comment:
   This is not what I meant by deduplicating. There's no need to collect 
starting and running workers into one list.
   
   The following logics for starting and running workers are identical:
   - Filtering workers that matches the given spec
   - Releasing as many resources as needed
   - Returning the remaining number of resources that needs to be released
   
   The above logics can be deduplicated as a method, with the only difference 
being the collection of workers passed in.



##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java:
##########
@@ -373,8 +421,8 @@ private void onContainersOfPriorityAllocated(Priority 
priority, List<Container>
                 requestResourceFutures.remove(taskExecutorProcessSpec);
             }
 
-            startTaskExecutorInContainerAsync(
-                    container, taskExecutorProcessSpec, resourceId, 
requestResourceFuture);
+            startTaskExecutorInContainerAsync(container, 
taskExecutorProcessSpec, resourceId);
+            requestResourceFuture.complete(new YarnWorkerNode(container, 
resourceId));

Review Comment:
   Minor:
   ```suggestion
               requestResourceFuture.complete(new YarnWorkerNode(container, 
resourceId));
               startTaskExecutorInContainerAsync(container, 
taskExecutorProcessSpec, resourceId);
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -284,7 +284,7 @@ public void close() throws Exception {
     @Override
     public void clearResourceRequirements(JobID jobId) {
         jobMasterTargetAddresses.remove(jobId);
-        taskManagerTracker.clearPendingAllocationsOfJob(jobId);
+        taskManagerTracker.clearPendingAllocationsOfJob(jobId, 
resourceAllocator.isSupported());

Review Comment:
   Same for `replaceAllPendingAllocations`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to