xintongsong commented on code in PR #21496: URL: https://github.com/apache/flink/pull/21496#discussion_r1053928976
########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ########## @@ -206,15 +220,29 @@ protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID reso return Optional.ofNullable(workerNodeMap.get(resourceID)); } + @VisibleForTesting + public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) { + for (ResourceDeclaration resourceDeclaration : resourceDeclarations) { + updateResourceDeclaration(resourceDeclaration); + } + + checkResourceDeclarations(); + } + @Override - protected void onWorkerRegistered(WorkerType worker) { + protected void onWorkerRegistered(WorkerType worker, WorkerResourceSpec workerResourceSpec) { final ResourceID resourceId = worker.getResourceID(); log.info("Worker {} is registered.", resourceId.getStringWithMetadata()); - final WorkerResourceSpec workerResourceSpec = - currentAttemptUnregisteredWorkers.remove(resourceId); tryRemovePreviousPendingRecoveryTaskManager(resourceId); - if (workerResourceSpec != null) { + + if (!workerResourceSpecs.containsKey(worker.getResourceID())) { + // recovered worker registered. + workerResourceSpecs.put(worker.getResourceID(), workerResourceSpec); + totalWorkerCounter.increaseAndGet(workerResourceSpec); + } Review Comment: Might be better to also add a log in this if-branch. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ########## @@ -286,17 +314,119 @@ public void onError(Throwable exception) { onFatalError(exception); } + @VisibleForTesting + public int releaseUnWantedResources( Review Comment: Move this method to the *Internal* section. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ########## @@ -286,17 +314,119 @@ public void onError(Throwable exception) { onFatalError(exception); } + @VisibleForTesting + public int releaseUnWantedResources( + Collection<InstanceID> unwantedWorkers, int needReleaseWorkerNumber) { + + Exception cause = + new FlinkExpectedException( + "slot manager has determined that the resource is no longer needed"); + for (InstanceID unwantedWorker : unwantedWorkers) { + if (needReleaseWorkerNumber <= 0) { + break; + } + if (releaseResource(unwantedWorker, cause)) { + needReleaseWorkerNumber--; + } + } + return needReleaseWorkerNumber; + } + // ------------------------------------------------------------------------ // Internal // ------------------------------------------------------------------------ - private void releaseResource(InstanceID instanceId, Exception cause) { + private void updateResourceDeclaration(ResourceDeclaration newResourceDeclaration) { + WorkerResourceSpec workerResourceSpec = newResourceDeclaration.getSpec(); + ResourceDeclaration oldDeclaration = this.resourceDeclarations.get(workerResourceSpec); + + Set<InstanceID> filteredUnwantedWorkers = new HashSet<>(); + if (oldDeclaration != null) { + oldDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + } + + newResourceDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + + this.resourceDeclarations.put( + workerResourceSpec, + new ResourceDeclaration( + workerResourceSpec, + newResourceDeclaration.getNumNeeded(), + filteredUnwantedWorkers)); + } + + private void checkResourceDeclarations() { + for (ResourceDeclaration resourceDeclaration : resourceDeclarations.values()) { + WorkerResourceSpec workerResourceSpec = resourceDeclaration.getSpec(); + int declaredWorkerNumber = resourceDeclaration.getNumNeeded(); + + final int releaseOrRequestWorkerNumber = + totalWorkerCounter.getNum(workerResourceSpec) - declaredWorkerNumber; + + if (releaseOrRequestWorkerNumber > 0) { + log.debug( + "need release {} workers, current worker number {}, declared worker number {}", + releaseOrRequestWorkerNumber, + totalWorkerCounter.getNum(workerResourceSpec), + declaredWorkerNumber); + + // release unwanted workers. + int remainingReleasingWorkerNumber = + releaseUnWantedResources( + resourceDeclaration.getUnwantedWorkers(), + releaseOrRequestWorkerNumber); + + // TODO, release pending/starting/running workers to exceed declared worker number. Review Comment: What's the plan for this todo? Will we address it in the next PR for canceling pending worker requests? ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceAllocator.java: ########## @@ -19,34 +19,22 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec; + +import java.util.Collection; /** Resource related actions which the {@link SlotManager} can perform. */ public interface ResourceAllocator { /** Whether allocate/release resources are supported. */ boolean isSupported(); - /** - * Releases the resource with the given instance id. - * - * @param instanceId identifying which resource to release - * @param cause why the resource is released - */ - void releaseResource(InstanceID instanceId, Exception cause); - /** * Releases the resource with the given resource id. * * @param resourceID identifying which resource to release */ void releaseResource(ResourceID resourceID); Review Comment: It's a bit wired that in a declarative protocol we still support releasing resources. Looking into cases where this method is called, I think it is only used for cleaning up when the connection is already disconnected. Therefore, I'd suggest to rename this to something like `cleaningUpDisconnectedResource`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java: ########## @@ -118,6 +124,34 @@ public void removeTaskManager(InstanceID instanceId) { } } + @Override + public void addUnWantedTaskManager(InstanceID instanceId) { + TaskManagerInfo taskManagerInfo = taskManagerRegistrations.get(instanceId); + if (taskManagerInfo != null) { + addUnWantedTaskManagerWithResource( + instanceId, + taskManagerInfo.getTotalResource(), + taskManagerInfo.getDefaultSlotResourceProfile()); + } + } + + public void addUnWantedTaskManagerWithResource( + InstanceID instanceId, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { + unWantedTaskManagers.put( + instanceId, + WorkerResourceSpec.fromTotalResourceProfile( + totalResourceProfile, + SlotManagerUtils.calculateDefaultNumSlots( + totalResourceProfile, defaultSlotResourceProfile))); + } Review Comment: I think the underlying question is that, what is the relation ship between `unWantedTaskManagers` and `taskManagerRegistrations`? Is unwanted workers always a subset of registered workers? ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java: ########## @@ -118,6 +124,34 @@ public void removeTaskManager(InstanceID instanceId) { } } + @Override + public void addUnWantedTaskManager(InstanceID instanceId) { + TaskManagerInfo taskManagerInfo = taskManagerRegistrations.get(instanceId); + if (taskManagerInfo != null) { + addUnWantedTaskManagerWithResource( + instanceId, + taskManagerInfo.getTotalResource(), + taskManagerInfo.getDefaultSlotResourceProfile()); + } + } + + public void addUnWantedTaskManagerWithResource( + InstanceID instanceId, + ResourceProfile totalResourceProfile, + ResourceProfile defaultSlotResourceProfile) { + unWantedTaskManagers.put( + instanceId, + WorkerResourceSpec.fromTotalResourceProfile( + totalResourceProfile, + SlotManagerUtils.calculateDefaultNumSlots( + totalResourceProfile, defaultSlotResourceProfile))); + } Review Comment: Again, why do we need 2 interfaces here for adding unwanted workers? ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ########## @@ -286,17 +314,119 @@ public void onError(Throwable exception) { onFatalError(exception); } + @VisibleForTesting + public int releaseUnWantedResources( + Collection<InstanceID> unwantedWorkers, int needReleaseWorkerNumber) { + + Exception cause = + new FlinkExpectedException( + "slot manager has determined that the resource is no longer needed"); + for (InstanceID unwantedWorker : unwantedWorkers) { + if (needReleaseWorkerNumber <= 0) { + break; + } + if (releaseResource(unwantedWorker, cause)) { + needReleaseWorkerNumber--; + } + } + return needReleaseWorkerNumber; + } + // ------------------------------------------------------------------------ // Internal // ------------------------------------------------------------------------ - private void releaseResource(InstanceID instanceId, Exception cause) { + private void updateResourceDeclaration(ResourceDeclaration newResourceDeclaration) { + WorkerResourceSpec workerResourceSpec = newResourceDeclaration.getSpec(); + ResourceDeclaration oldDeclaration = this.resourceDeclarations.get(workerResourceSpec); + + Set<InstanceID> filteredUnwantedWorkers = new HashSet<>(); + if (oldDeclaration != null) { + oldDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + } + + newResourceDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + + this.resourceDeclarations.put( + workerResourceSpec, + new ResourceDeclaration( + workerResourceSpec, + newResourceDeclaration.getNumNeeded(), + filteredUnwantedWorkers)); + } + + private void checkResourceDeclarations() { + for (ResourceDeclaration resourceDeclaration : resourceDeclarations.values()) { + WorkerResourceSpec workerResourceSpec = resourceDeclaration.getSpec(); + int declaredWorkerNumber = resourceDeclaration.getNumNeeded(); + + final int releaseOrRequestWorkerNumber = + totalWorkerCounter.getNum(workerResourceSpec) - declaredWorkerNumber; + + if (releaseOrRequestWorkerNumber > 0) { + log.debug( + "need release {} workers, current worker number {}, declared worker number {}", + releaseOrRequestWorkerNumber, + totalWorkerCounter.getNum(workerResourceSpec), + declaredWorkerNumber); + + // release unwanted workers. + int remainingReleasingWorkerNumber = + releaseUnWantedResources( + resourceDeclaration.getUnwantedWorkers(), + releaseOrRequestWorkerNumber); + + // TODO, release pending/starting/running workers to exceed declared worker number. + if (remainingReleasingWorkerNumber > 0) { + log.debug( + "need release {} workers after release unwanted workers.", + remainingReleasingWorkerNumber); + } + + // try update unwanted workers. + updateResourceDeclaration(resourceDeclaration); Review Comment: Not sure about modifying the declaration. It's against intuition that ARM maintains something that called resource declarations but is not what the slot manager actually declared. I think it would be better to keep the declarations unchanged, and make `releaseUnWantedResources` being able to skip unwanted workers that no longer exist. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedSlotManager.java: ########## @@ -739,9 +789,8 @@ private void releaseIdleTaskExecutorIfPossible(TaskManagerInfo taskManagerInfo) private void releaseIdleTaskExecutor(InstanceID timedOutTaskManagerId) { if (resourceAllocator.isSupported()) { - final FlinkExpectedException cause = - new FlinkExpectedException("TaskManager exceeded the idle timeout."); - resourceAllocator.releaseResource(timedOutTaskManagerId, cause); + taskManagerTracker.addUnWantedTaskManager(timedOutTaskManagerId); + declareNeededResources(); Review Comment: I think we still need the delay mechanism, to avoid too frequent declarations. E.g., we may have tens or even hundreds of idle taskmanagers being found in the same round of idleness check, which means this method `releaseIdleTaskExecutor` can be called hundreds of times within very short time, each triggers a processing of the full declarations. A very short delay (e.g., 50ms) would significantly help on this. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ########## @@ -286,17 +314,119 @@ public void onError(Throwable exception) { onFatalError(exception); } + @VisibleForTesting + public int releaseUnWantedResources( + Collection<InstanceID> unwantedWorkers, int needReleaseWorkerNumber) { + + Exception cause = + new FlinkExpectedException( + "slot manager has determined that the resource is no longer needed"); + for (InstanceID unwantedWorker : unwantedWorkers) { + if (needReleaseWorkerNumber <= 0) { + break; + } + if (releaseResource(unwantedWorker, cause)) { + needReleaseWorkerNumber--; + } + } + return needReleaseWorkerNumber; + } + // ------------------------------------------------------------------------ // Internal // ------------------------------------------------------------------------ - private void releaseResource(InstanceID instanceId, Exception cause) { + private void updateResourceDeclaration(ResourceDeclaration newResourceDeclaration) { + WorkerResourceSpec workerResourceSpec = newResourceDeclaration.getSpec(); + ResourceDeclaration oldDeclaration = this.resourceDeclarations.get(workerResourceSpec); + + Set<InstanceID> filteredUnwantedWorkers = new HashSet<>(); + if (oldDeclaration != null) { + oldDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + } + + newResourceDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + + this.resourceDeclarations.put( + workerResourceSpec, + new ResourceDeclaration( + workerResourceSpec, + newResourceDeclaration.getNumNeeded(), + filteredUnwantedWorkers)); + } Review Comment: Moreover, I think we need to replace all the declarations that the active resource manager maintains, rather than just the new declaration. E.g., if the slot manager firstly declare `[<A, 1>, <B, 1>]`, then update the declaration to be `[<A, 1>]`, which means `B` is no longer needed. With the current approach, resource needed for `B` would not be updated. ########## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java: ########## @@ -286,17 +314,119 @@ public void onError(Throwable exception) { onFatalError(exception); } + @VisibleForTesting + public int releaseUnWantedResources( + Collection<InstanceID> unwantedWorkers, int needReleaseWorkerNumber) { + + Exception cause = + new FlinkExpectedException( + "slot manager has determined that the resource is no longer needed"); + for (InstanceID unwantedWorker : unwantedWorkers) { + if (needReleaseWorkerNumber <= 0) { + break; + } + if (releaseResource(unwantedWorker, cause)) { + needReleaseWorkerNumber--; + } + } + return needReleaseWorkerNumber; + } + // ------------------------------------------------------------------------ // Internal // ------------------------------------------------------------------------ - private void releaseResource(InstanceID instanceId, Exception cause) { + private void updateResourceDeclaration(ResourceDeclaration newResourceDeclaration) { + WorkerResourceSpec workerResourceSpec = newResourceDeclaration.getSpec(); + ResourceDeclaration oldDeclaration = this.resourceDeclarations.get(workerResourceSpec); + + Set<InstanceID> filteredUnwantedWorkers = new HashSet<>(); + if (oldDeclaration != null) { + oldDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + } + + newResourceDeclaration + .getUnwantedWorkers() + .forEach( + instanceID -> { + if (getWorkerByInstanceId(instanceID) != null) { + filteredUnwantedWorkers.add(instanceID); + } + }); + + this.resourceDeclarations.put( + workerResourceSpec, + new ResourceDeclaration( + workerResourceSpec, + newResourceDeclaration.getNumNeeded(), + filteredUnwantedWorkers)); + } Review Comment: Why are we merging the old and new declarations? I think in a declarative protocol, only the latest declarations should be considered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org