xintongsong commented on code in PR #21496:
URL: https://github.com/apache/flink/pull/21496#discussion_r1053928976


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -206,15 +220,29 @@ protected Optional<WorkerType> 
getWorkerNodeIfAcceptRegistration(ResourceID reso
         return Optional.ofNullable(workerNodeMap.get(resourceID));
     }
 
+    @VisibleForTesting
+    public void declareResourceNeeded(Collection<ResourceDeclaration> 
resourceDeclarations) {
+        for (ResourceDeclaration resourceDeclaration : resourceDeclarations) {
+            updateResourceDeclaration(resourceDeclaration);
+        }
+
+        checkResourceDeclarations();
+    }
+
     @Override
-    protected void onWorkerRegistered(WorkerType worker) {
+    protected void onWorkerRegistered(WorkerType worker, WorkerResourceSpec 
workerResourceSpec) {
         final ResourceID resourceId = worker.getResourceID();
         log.info("Worker {} is registered.", 
resourceId.getStringWithMetadata());
 
-        final WorkerResourceSpec workerResourceSpec =
-                currentAttemptUnregisteredWorkers.remove(resourceId);
         tryRemovePreviousPendingRecoveryTaskManager(resourceId);
-        if (workerResourceSpec != null) {
+
+        if (!workerResourceSpecs.containsKey(worker.getResourceID())) {
+            // recovered worker registered.
+            workerResourceSpecs.put(worker.getResourceID(), 
workerResourceSpec);
+            totalWorkerCounter.increaseAndGet(workerResourceSpec);
+        }

Review Comment:
   Might be better to also add a log in this if-branch.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
         onFatalError(exception);
     }
 
+    @VisibleForTesting
+    public int releaseUnWantedResources(

Review Comment:
   Move this method to the *Internal* section.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
         onFatalError(exception);
     }
 
+    @VisibleForTesting
+    public int releaseUnWantedResources(
+            Collection<InstanceID> unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+        Exception cause =
+                new FlinkExpectedException(
+                        "slot manager has determined that the resource is no 
longer needed");
+        for (InstanceID unwantedWorker : unwantedWorkers) {
+            if (needReleaseWorkerNumber <= 0) {
+                break;
+            }
+            if (releaseResource(unwantedWorker, cause)) {
+                needReleaseWorkerNumber--;
+            }
+        }
+        return needReleaseWorkerNumber;
+    }
+
     // ------------------------------------------------------------------------
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void releaseResource(InstanceID instanceId, Exception cause) {
+    private void updateResourceDeclaration(ResourceDeclaration 
newResourceDeclaration) {
+        WorkerResourceSpec workerResourceSpec = 
newResourceDeclaration.getSpec();
+        ResourceDeclaration oldDeclaration = 
this.resourceDeclarations.get(workerResourceSpec);
+
+        Set<InstanceID> filteredUnwantedWorkers = new HashSet<>();
+        if (oldDeclaration != null) {
+            oldDeclaration
+                    .getUnwantedWorkers()
+                    .forEach(
+                            instanceID -> {
+                                if (getWorkerByInstanceId(instanceID) != null) 
{
+                                    filteredUnwantedWorkers.add(instanceID);
+                                }
+                            });
+        }
+
+        newResourceDeclaration
+                .getUnwantedWorkers()
+                .forEach(
+                        instanceID -> {
+                            if (getWorkerByInstanceId(instanceID) != null) {
+                                filteredUnwantedWorkers.add(instanceID);
+                            }
+                        });
+
+        this.resourceDeclarations.put(
+                workerResourceSpec,
+                new ResourceDeclaration(
+                        workerResourceSpec,
+                        newResourceDeclaration.getNumNeeded(),
+                        filteredUnwantedWorkers));
+    }
+
+    private void checkResourceDeclarations() {
+        for (ResourceDeclaration resourceDeclaration : 
resourceDeclarations.values()) {
+            WorkerResourceSpec workerResourceSpec = 
resourceDeclaration.getSpec();
+            int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+            final int releaseOrRequestWorkerNumber =
+                    totalWorkerCounter.getNum(workerResourceSpec) - 
declaredWorkerNumber;
+
+            if (releaseOrRequestWorkerNumber > 0) {
+                log.debug(
+                        "need release {} workers, current worker number {}, 
declared worker number {}",
+                        releaseOrRequestWorkerNumber,
+                        totalWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+
+                // release unwanted workers.
+                int remainingReleasingWorkerNumber =
+                        releaseUnWantedResources(
+                                resourceDeclaration.getUnwantedWorkers(),
+                                releaseOrRequestWorkerNumber);
+
+                // TODO, release pending/starting/running workers to exceed 
declared worker number.

Review Comment:
   What's the plan for this todo? Will we address it in the next PR for 
canceling pending worker requests?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java:
##########
@@ -19,34 +19,22 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+
+import java.util.Collection;
 
 /** Resource related actions which the {@link SlotManager} can perform. */
 public interface ResourceAllocator {
 
     /** Whether allocate/release resources are supported. */
     boolean isSupported();
 
-    /**
-     * Releases the resource with the given instance id.
-     *
-     * @param instanceId identifying which resource to release
-     * @param cause why the resource is released
-     */
-    void releaseResource(InstanceID instanceId, Exception cause);
-
     /**
      * Releases the resource with the given resource id.
      *
      * @param resourceID identifying which resource to release
      */
     void releaseResource(ResourceID resourceID);

Review Comment:
   It's a bit wired that in a declarative protocol we still support releasing 
resources. Looking into cases where this method is called, I think it is only 
used for cleaning up when the connection is already disconnected. Therefore, 
I'd suggest to rename this to something like `cleaningUpDisconnectedResource`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -118,6 +124,34 @@ public void removeTaskManager(InstanceID instanceId) {
         }
     }
 
+    @Override
+    public void addUnWantedTaskManager(InstanceID instanceId) {
+        TaskManagerInfo taskManagerInfo = 
taskManagerRegistrations.get(instanceId);
+        if (taskManagerInfo != null) {
+            addUnWantedTaskManagerWithResource(
+                    instanceId,
+                    taskManagerInfo.getTotalResource(),
+                    taskManagerInfo.getDefaultSlotResourceProfile());
+        }
+    }
+
+    public void addUnWantedTaskManagerWithResource(
+            InstanceID instanceId,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
+        unWantedTaskManagers.put(
+                instanceId,
+                WorkerResourceSpec.fromTotalResourceProfile(
+                        totalResourceProfile,
+                        SlotManagerUtils.calculateDefaultNumSlots(
+                                totalResourceProfile, 
defaultSlotResourceProfile)));
+    }

Review Comment:
   I think the underlying question is that, what is the relation ship between 
`unWantedTaskManagers` and `taskManagerRegistrations`? Is unwanted workers 
always a subset of registered workers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -118,6 +124,34 @@ public void removeTaskManager(InstanceID instanceId) {
         }
     }
 
+    @Override
+    public void addUnWantedTaskManager(InstanceID instanceId) {
+        TaskManagerInfo taskManagerInfo = 
taskManagerRegistrations.get(instanceId);
+        if (taskManagerInfo != null) {
+            addUnWantedTaskManagerWithResource(
+                    instanceId,
+                    taskManagerInfo.getTotalResource(),
+                    taskManagerInfo.getDefaultSlotResourceProfile());
+        }
+    }
+
+    public void addUnWantedTaskManagerWithResource(
+            InstanceID instanceId,
+            ResourceProfile totalResourceProfile,
+            ResourceProfile defaultSlotResourceProfile) {
+        unWantedTaskManagers.put(
+                instanceId,
+                WorkerResourceSpec.fromTotalResourceProfile(
+                        totalResourceProfile,
+                        SlotManagerUtils.calculateDefaultNumSlots(
+                                totalResourceProfile, 
defaultSlotResourceProfile)));
+    }

Review Comment:
   Again, why do we need 2 interfaces here for adding unwanted workers?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
         onFatalError(exception);
     }
 
+    @VisibleForTesting
+    public int releaseUnWantedResources(
+            Collection<InstanceID> unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+        Exception cause =
+                new FlinkExpectedException(
+                        "slot manager has determined that the resource is no 
longer needed");
+        for (InstanceID unwantedWorker : unwantedWorkers) {
+            if (needReleaseWorkerNumber <= 0) {
+                break;
+            }
+            if (releaseResource(unwantedWorker, cause)) {
+                needReleaseWorkerNumber--;
+            }
+        }
+        return needReleaseWorkerNumber;
+    }
+
     // ------------------------------------------------------------------------
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void releaseResource(InstanceID instanceId, Exception cause) {
+    private void updateResourceDeclaration(ResourceDeclaration 
newResourceDeclaration) {
+        WorkerResourceSpec workerResourceSpec = 
newResourceDeclaration.getSpec();
+        ResourceDeclaration oldDeclaration = 
this.resourceDeclarations.get(workerResourceSpec);
+
+        Set<InstanceID> filteredUnwantedWorkers = new HashSet<>();
+        if (oldDeclaration != null) {
+            oldDeclaration
+                    .getUnwantedWorkers()
+                    .forEach(
+                            instanceID -> {
+                                if (getWorkerByInstanceId(instanceID) != null) 
{
+                                    filteredUnwantedWorkers.add(instanceID);
+                                }
+                            });
+        }
+
+        newResourceDeclaration
+                .getUnwantedWorkers()
+                .forEach(
+                        instanceID -> {
+                            if (getWorkerByInstanceId(instanceID) != null) {
+                                filteredUnwantedWorkers.add(instanceID);
+                            }
+                        });
+
+        this.resourceDeclarations.put(
+                workerResourceSpec,
+                new ResourceDeclaration(
+                        workerResourceSpec,
+                        newResourceDeclaration.getNumNeeded(),
+                        filteredUnwantedWorkers));
+    }
+
+    private void checkResourceDeclarations() {
+        for (ResourceDeclaration resourceDeclaration : 
resourceDeclarations.values()) {
+            WorkerResourceSpec workerResourceSpec = 
resourceDeclaration.getSpec();
+            int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
+
+            final int releaseOrRequestWorkerNumber =
+                    totalWorkerCounter.getNum(workerResourceSpec) - 
declaredWorkerNumber;
+
+            if (releaseOrRequestWorkerNumber > 0) {
+                log.debug(
+                        "need release {} workers, current worker number {}, 
declared worker number {}",
+                        releaseOrRequestWorkerNumber,
+                        totalWorkerCounter.getNum(workerResourceSpec),
+                        declaredWorkerNumber);
+
+                // release unwanted workers.
+                int remainingReleasingWorkerNumber =
+                        releaseUnWantedResources(
+                                resourceDeclaration.getUnwantedWorkers(),
+                                releaseOrRequestWorkerNumber);
+
+                // TODO, release pending/starting/running workers to exceed 
declared worker number.
+                if (remainingReleasingWorkerNumber > 0) {
+                    log.debug(
+                            "need release {} workers after release unwanted 
workers.",
+                            remainingReleasingWorkerNumber);
+                }
+
+                // try update unwanted workers.
+                updateResourceDeclaration(resourceDeclaration);

Review Comment:
   Not sure about modifying the declaration. It's against intuition that ARM 
maintains something that called resource declarations but is not what the slot 
manager actually declared. I think it would be better to keep the declarations 
unchanged, and make `releaseUnWantedResources` being able to skip unwanted 
workers that no longer exist.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java:
##########
@@ -739,9 +789,8 @@ private void 
releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo)
 
     private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) {
         if (resourceAllocator.isSupported()) {
-            final FlinkExpectedException cause =
-                    new FlinkExpectedException("TaskManager exceeded the idle 
timeout.");
-            resourceAllocator.releaseResource(timedOutTaskManagerId, cause);
+            taskManagerTracker.addUnWantedTaskManager(timedOutTaskManagerId);
+            declareNeededResources();

Review Comment:
   I think we still need the delay mechanism, to avoid too frequent 
declarations. E.g., we may have tens or even hundreds of idle taskmanagers 
being found in the same round of idleness check, which means this method 
`releaseIdleTaskExecutor` can be called hundreds of times within very short 
time, each triggers a processing of the full declarations. A very short delay 
(e.g., 50ms) would significantly help on this.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
         onFatalError(exception);
     }
 
+    @VisibleForTesting
+    public int releaseUnWantedResources(
+            Collection<InstanceID> unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+        Exception cause =
+                new FlinkExpectedException(
+                        "slot manager has determined that the resource is no 
longer needed");
+        for (InstanceID unwantedWorker : unwantedWorkers) {
+            if (needReleaseWorkerNumber <= 0) {
+                break;
+            }
+            if (releaseResource(unwantedWorker, cause)) {
+                needReleaseWorkerNumber--;
+            }
+        }
+        return needReleaseWorkerNumber;
+    }
+
     // ------------------------------------------------------------------------
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void releaseResource(InstanceID instanceId, Exception cause) {
+    private void updateResourceDeclaration(ResourceDeclaration 
newResourceDeclaration) {
+        WorkerResourceSpec workerResourceSpec = 
newResourceDeclaration.getSpec();
+        ResourceDeclaration oldDeclaration = 
this.resourceDeclarations.get(workerResourceSpec);
+
+        Set<InstanceID> filteredUnwantedWorkers = new HashSet<>();
+        if (oldDeclaration != null) {
+            oldDeclaration
+                    .getUnwantedWorkers()
+                    .forEach(
+                            instanceID -> {
+                                if (getWorkerByInstanceId(instanceID) != null) 
{
+                                    filteredUnwantedWorkers.add(instanceID);
+                                }
+                            });
+        }
+
+        newResourceDeclaration
+                .getUnwantedWorkers()
+                .forEach(
+                        instanceID -> {
+                            if (getWorkerByInstanceId(instanceID) != null) {
+                                filteredUnwantedWorkers.add(instanceID);
+                            }
+                        });
+
+        this.resourceDeclarations.put(
+                workerResourceSpec,
+                new ResourceDeclaration(
+                        workerResourceSpec,
+                        newResourceDeclaration.getNumNeeded(),
+                        filteredUnwantedWorkers));
+    }

Review Comment:
   Moreover, I think we need to replace all the declarations that the active 
resource manager maintains, rather than just the new declaration. E.g., if the 
slot manager firstly declare `[<A, 1>, <B, 1>]`, then update the declaration to 
be `[<A, 1>]`, which means `B` is no longer needed. With the current approach, 
resource needed for `B` would not be updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -286,17 +314,119 @@ public void onError(Throwable exception) {
         onFatalError(exception);
     }
 
+    @VisibleForTesting
+    public int releaseUnWantedResources(
+            Collection<InstanceID> unwantedWorkers, int 
needReleaseWorkerNumber) {
+
+        Exception cause =
+                new FlinkExpectedException(
+                        "slot manager has determined that the resource is no 
longer needed");
+        for (InstanceID unwantedWorker : unwantedWorkers) {
+            if (needReleaseWorkerNumber <= 0) {
+                break;
+            }
+            if (releaseResource(unwantedWorker, cause)) {
+                needReleaseWorkerNumber--;
+            }
+        }
+        return needReleaseWorkerNumber;
+    }
+
     // ------------------------------------------------------------------------
     //  Internal
     // ------------------------------------------------------------------------
 
-    private void releaseResource(InstanceID instanceId, Exception cause) {
+    private void updateResourceDeclaration(ResourceDeclaration 
newResourceDeclaration) {
+        WorkerResourceSpec workerResourceSpec = 
newResourceDeclaration.getSpec();
+        ResourceDeclaration oldDeclaration = 
this.resourceDeclarations.get(workerResourceSpec);
+
+        Set<InstanceID> filteredUnwantedWorkers = new HashSet<>();
+        if (oldDeclaration != null) {
+            oldDeclaration
+                    .getUnwantedWorkers()
+                    .forEach(
+                            instanceID -> {
+                                if (getWorkerByInstanceId(instanceID) != null) 
{
+                                    filteredUnwantedWorkers.add(instanceID);
+                                }
+                            });
+        }
+
+        newResourceDeclaration
+                .getUnwantedWorkers()
+                .forEach(
+                        instanceID -> {
+                            if (getWorkerByInstanceId(instanceID) != null) {
+                                filteredUnwantedWorkers.add(instanceID);
+                            }
+                        });
+
+        this.resourceDeclarations.put(
+                workerResourceSpec,
+                new ResourceDeclaration(
+                        workerResourceSpec,
+                        newResourceDeclaration.getNumNeeded(),
+                        filteredUnwantedWorkers));
+    }

Review Comment:
   Why are we merging the old and new declarations? I think in a declarative 
protocol, only the latest declarations should be considered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to