xintongsong commented on code in PR #21565:
URL: https://github.com/apache/flink/pull/21565#discussion_r1060267951
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -526,6 +528,40 @@ public Collection<PendingTaskManagerSlot>
getPendingTaskManagerSlots() {
return pendingSlots.values();
}
+ /**
+ * remove unused pending task manager slots.
+ *
+ * @param unUsedResourceCounter the count of unused resources.
+ */
+ public void removePendingTaskManagerSlots(ResourceCounter
unUsedResourceCounter) {
+ Iterator<Map.Entry<TaskManagerSlotId, PendingTaskManagerSlot>>
pendingSlotIterator =
+ pendingSlots.entrySet().iterator();
+ while (pendingSlotIterator.hasNext()) {
+ Map.Entry<TaskManagerSlotId, PendingTaskManagerSlot>
pendingTaskManagerSlotEntry =
+ pendingSlotIterator.next();
+ PendingTaskManagerSlot pendingTaskManagerSlot =
pendingTaskManagerSlotEntry.getValue();
+ ResourceProfile resourceProfile =
pendingTaskManagerSlot.getResourceProfile();
+ if (unUsedResourceCounter.getResourceCount(resourceProfile) > 0) {
Review Comment:
For `DeclarativeSlotManager`, fine-grained resource management is
unsupported, and the resource profiles for pending slots should always be
`defaultSlotResourceProfile`. (Constructor of `PendingTaskManagerSlot` is
always called with `defaultSlotResourceProfile` as the argument.)
Based on this assumption, we can add a `checkState` for protection and
simplifies this method.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/FineGrainedTaskManagerTracker.java:
##########
@@ -82,12 +83,26 @@ public void replaceAllPendingAllocations(
LOG.trace("Record the pending allocations {}.",
pendingSlotAllocations);
pendingSlotAllocationRecords.clear();
pendingSlotAllocationRecords.putAll(pendingSlotAllocations);
+ Set<PendingTaskManagerId> unusedPendingTaskManager =
+ pendingTaskManagers.keySet().stream()
+ .filter(id ->
!pendingSlotAllocationRecords.containsKey(id))
+ .collect(Collectors.toSet());
+ for (PendingTaskManagerId pendingTaskManagerId :
unusedPendingTaskManager) {
+ removePendingTaskManager(pendingTaskManagerId);
+ }
}
@Override
public void clearPendingAllocationsOfJob(JobID jobId) {
LOG.info("Clear all pending allocations for job {}.", jobId);
pendingSlotAllocationRecords.values().forEach(allocation ->
allocation.remove(jobId));
+ Set<PendingTaskManagerId> unusedPendingTaskManager =
+ pendingTaskManagers.keySet().stream()
+ .filter(id ->
!pendingSlotAllocationRecords.containsKey(id))
+ .collect(Collectors.toSet());
+ for (PendingTaskManagerId pendingTaskManagerId :
unusedPendingTaskManager) {
+ removePendingTaskManager(pendingTaskManagerId);
+ }
Review Comment:
These can be deduplicated as `removeUnusedPendingTaskManagers()` and reused
in `replaceAllPendingAllocations`. Even `clearAllPendingTaskManager` can be
replaced.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -384,18 +422,59 @@ private int releaseUnWantedResources(
return needReleaseWorkerNumber;
}
+ private int releaseResources(Collection<ResourceID> resourceIDS, int
needReleaseWorkerNumber) {
Review Comment:
```suggestion
private int releaseResources(Collection<ResourceID> resourceIds, int
needReleaseWorkerNumber) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java:
##########
@@ -526,6 +528,40 @@ public Collection<PendingTaskManagerSlot>
getPendingTaskManagerSlots() {
return pendingSlots.values();
}
+ /**
+ * remove unused pending task manager slots.
+ *
+ * @param unUsedResourceCounter the count of unused resources.
+ */
+ public void removePendingTaskManagerSlots(ResourceCounter
unUsedResourceCounter) {
+ Iterator<Map.Entry<TaskManagerSlotId, PendingTaskManagerSlot>>
pendingSlotIterator =
+ pendingSlots.entrySet().iterator();
+ while (pendingSlotIterator.hasNext()) {
+ Map.Entry<TaskManagerSlotId, PendingTaskManagerSlot>
pendingTaskManagerSlotEntry =
+ pendingSlotIterator.next();
+ PendingTaskManagerSlot pendingTaskManagerSlot =
pendingTaskManagerSlotEntry.getValue();
+ ResourceProfile resourceProfile =
pendingTaskManagerSlot.getResourceProfile();
+ if (unUsedResourceCounter.getResourceCount(resourceProfile) > 0) {
+ pendingSlotIterator.remove();
+ unUsedResourceCounter =
unUsedResourceCounter.subtract(resourceProfile, 1);
+ }
+ }
+
+ if (resourceAllocator.isSupported()) {
+ declareNeededResourcesWithDelay();
+ }
+ }
+
+ /** clear all pending task manager slots. */
+ public void clearPendingTaskManagerSlots() {
+ if (!pendingSlots.isEmpty()) {
+ this.pendingSlots.clear();
+ if (resourceAllocator.isSupported()) {
+ declareNeededResourcesWithDelay();
+ }
Review Comment:
Strictly speaking, we should not clear pending slots if resource allocator
is not supported, or the internal state of `TaskExecutorManager` can be
inaccurate. This is not causing troubles now, because pending slots can only
exist when the resource allocator is supported. Therefore, it might be better
to use check whether resource allocator is supported at the very beginning, and
skip everything else if it's not supported.
Same for `removePendingTaskManagerSlots`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -384,18 +422,59 @@ private int releaseUnWantedResources(
return needReleaseWorkerNumber;
}
+ private int releaseResources(Collection<ResourceID> resourceIDS, int
needReleaseWorkerNumber) {
Review Comment:
```suggestion
private int releaseResources(Collection<ResourceID> resourceIds, int
needReleaseWorkerNumber) {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -342,15 +348,47 @@ private void checkResourceDeclarations() {
resourceDeclaration.getUnwantedWorkers(),
releaseOrRequestWorkerNumber);
- // TODO, release pending/starting/running workers to exceed
declared worker number.
if (remainingReleasingWorkerNumber > 0) {
- log.debug(
- "need release {} workers after release unwanted
workers.",
+ // release not allocated workers;
+ remainingReleasingWorkerNumber =
+ releaseUnAllocatedWorkers(
+ workerResourceSpec,
remainingReleasingWorkerNumber);
+ }
+
+ if (remainingReleasingWorkerNumber > 0) {
+ // release starting workers;
+ Set<ResourceID> workerCanRelease =
+ currentAttemptUnregisteredWorkers.stream()
+ .filter(
+ r ->
+ workerResourceSpec.equals(
+
workerResourceSpecs.get(r)))
+ .collect(Collectors.toSet());
+ remainingReleasingWorkerNumber =
+ releaseResources(workerCanRelease,
remainingReleasingWorkerNumber);
+ }
+
+ if (remainingReleasingWorkerNumber > 0) {
+ // release running workers;
+ Set<ResourceID> workerCanRelease =
+ workerNodeMap.keySet().stream()
+ .filter(
+ r ->
+ workerResourceSpec.equals(
+
workerResourceSpecs.get(r)))
+ .collect(Collectors.toSet());
+ remainingReleasingWorkerNumber =
+ releaseResources(workerCanRelease,
remainingReleasingWorkerNumber);
+ }
Review Comment:
These 2 branches can be deduplicated.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -384,18 +422,59 @@ private int releaseUnWantedResources(
return needReleaseWorkerNumber;
}
+ private int releaseResources(Collection<ResourceID> resourceIDS, int
needReleaseWorkerNumber) {
+ Exception cause = new FlinkExpectedException("resource is no longer
needed");
+ for (ResourceID resourceID : resourceIDS) {
+ if (needReleaseWorkerNumber <= 0) {
+ break;
+ }
+
+ if (releaseResource(resourceID, cause)) {
+ needReleaseWorkerNumber--;
+ } else {
+ log.warn("Resource {} could not release.", resourceID);
+ }
+ }
+
+ return needReleaseWorkerNumber;
+ }
+
private boolean releaseResource(InstanceID instanceId, Exception cause) {
WorkerType worker = getWorkerByInstanceId(instanceId);
if (worker != null) {
- internalStopWorker(worker.getResourceID());
- closeTaskManagerConnection(worker.getResourceID(), cause);
- return true;
+ return releaseResource(worker.getResourceID(), cause);
} else {
log.debug("Instance {} not found in ResourceManager.", instanceId);
return false;
}
}
+ private boolean releaseResource(ResourceID resourceID, Exception cause) {
+ if (workerNodeMap.containsKey(resourceID)) {
+ internalStopWorker(resourceID);
+ closeTaskManagerConnection(resourceID, cause);
+ return true;
+ }
+ return false;
+ }
+
+ private int releaseUnAllocatedWorkers(
+ WorkerResourceSpec workerResourceSpec, int
needReleaseWorkerNumber) {
+ Set<CompletableFuture<WorkerType>>
unAllocatedWorkerFuturesShouldRelease =
+ unAllocatedWorkerFutures.entrySet().stream()
+ .filter(e -> e.getValue().equals(workerResourceSpec))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ for (CompletableFuture<WorkerType> requestFuture :
unAllocatedWorkerFuturesShouldRelease) {
+ if (needReleaseWorkerNumber <= 0) {
+ break;
+ }
+
requestFuture.completeExceptionally(RequestCancelledException.INSTANCE);
Review Comment:
Why not just use `requestFuture.cancel()`? It has exactly the semantics we
want, and the return value can be used to check whether the cancelation has
succeed. Then we won't need to introduce `RequestCancelledException`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -415,27 +494,61 @@ public void requestNewWorker(WorkerResourceSpec
workerResourceSpec) {
workerResourceSpec,
pendingCount);
+ final CompletableFuture<WorkerType> requestResourceFuture = new
CompletableFuture<>();
+ unAllocatedWorkerFutures.put(requestResourceFuture,
workerResourceSpec);
+
// In case of start worker failures, we should wait for an interval
before
// trying to start new workers.
// Otherwise, ActiveResourceManager will always re-requesting the
worker,
// which keeps the main thread busy.
- final CompletableFuture<WorkerType> requestResourceFuture =
- startWorkerCoolDown.thenCompose(
- (ignore) ->
resourceManagerDriver.requestResource(taskExecutorProcessSpec));
+ startWorkerCoolDown.thenRun(
+ () -> {
+ if (!requestResourceFuture.isDone()) {
+ CompletableFuture<WorkerType>
innerRequestResourceFuture =
+
resourceManagerDriver.requestResource(taskExecutorProcessSpec);
+
+ // forward inner request future result(finished by
ResourceManagerDriver) to
+ // outer future.
+ FutureUtils.forward(innerRequestResourceFuture,
requestResourceFuture);
+
+ // forward outer future exception (cancelled by
ActiveResourceManager) to
+ // inner request future. ResourceManagerDriver should
cancel this requests.
+ requestResourceFuture.whenComplete(
+ (ignore, t) -> {
+ if (t != null &&
!innerRequestResourceFuture.isDone()) {
+
innerRequestResourceFuture.completeExceptionally(t);
+ }
+ });
+ } else {
+ log.debug(
+ "Pending request with resource spec {} already
canceled, will not trigger request.",
+ workerResourceSpec);
+ }
+ });
+
Review Comment:
The complexity of the current approach mainly comes from maintaining two
futures and the mapping in between. This is because we want to be able to
cancel both requests that are already sent to the drivers, and requests that
are not yet sent due to the cooling down mechanism.
As the active resource manager is declarative now, I think we probably can
handle the cooling down smarter. To be specific, we can move the cooling down
from `requestNewWorker` to `checkResourceDeclarations`.
The current approach has the following steps:
- Decide to request new worker in `checkResourceDeclarations`.
- Wait for the cooling down, if needed, in `requestNewWorker`.
- Upon the cooling down finish, check whether the resource is still needed
in `requestNewWorker`, and request if still needed.
The approach I suggest is:
- In `checkResourceDeclarations`, if need to request new workers, check
whether need to wait for the cooling down.
- If not, request the resources.
- If yes, schedule another `checkResourceDeclarations` upon the cooling down
finish.
In this way, we won't have two different approaches
(`checkResourceDeclarations` and `unAllocatedWorkerFutures`) for the same
purpose (deciding whether the resource needs to be requested).
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java:
##########
@@ -342,15 +348,47 @@ private void checkResourceDeclarations() {
resourceDeclaration.getUnwantedWorkers(),
releaseOrRequestWorkerNumber);
- // TODO, release pending/starting/running workers to exceed
declared worker number.
if (remainingReleasingWorkerNumber > 0) {
- log.debug(
- "need release {} workers after release unwanted
workers.",
+ // release not allocated workers;
+ remainingReleasingWorkerNumber =
+ releaseUnAllocatedWorkers(
+ workerResourceSpec,
remainingReleasingWorkerNumber);
+ }
+
+ if (remainingReleasingWorkerNumber > 0) {
+ // release starting workers;
+ Set<ResourceID> workerCanRelease =
+ currentAttemptUnregisteredWorkers.stream()
+ .filter(
+ r ->
+ workerResourceSpec.equals(
+
workerResourceSpecs.get(r)))
+ .collect(Collectors.toSet());
+ remainingReleasingWorkerNumber =
+ releaseResources(workerCanRelease,
remainingReleasingWorkerNumber);
+ }
+
+ if (remainingReleasingWorkerNumber > 0) {
+ // release running workers;
+ Set<ResourceID> workerCanRelease =
+ workerNodeMap.keySet().stream()
+ .filter(
+ r ->
+ workerResourceSpec.equals(
+
workerResourceSpecs.get(r)))
+ .collect(Collectors.toSet());
+ remainingReleasingWorkerNumber =
+ releaseResources(workerCanRelease,
remainingReleasingWorkerNumber);
+ }
+
+ if (remainingReleasingWorkerNumber > 0) {
+ log.error(
+ "need release {} workers after release workers.",
remainingReleasingWorkerNumber);
}
Review Comment:
How could this happen? If this is never expected to happen, we should use
`checkState` to fail fast when there's something wrong.
##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java:
##########
@@ -435,9 +487,19 @@ private void startTaskExecutorInContainerAsync(
containerLaunchContextFuture.handleAsync(
(context, exception) -> {
if (exception == null) {
-
nodeManagerClient.startContainerAsync(container, context);
- requestResourceFuture.complete(
- new YarnWorkerNode(container,
resourceId));
+ if (FutureUtils.isCompletedWithException(
+ requestResourceFuture,
+
ActiveResourceManager.RequestCancelledException.class)) {
+ log.info(
+ "container {} already be
cancelled.",
+ container.getId());
+
resourceManagerClient.releaseAssignedContainer(
+ container.getId());
+ } else {
+
nodeManagerClient.startContainerAsync(container, context);
+ requestResourceFuture.complete(
+ new YarnWorkerNode(container,
resourceId));
+ }
Review Comment:
I'm not entirely sure whether this is needed. Ideally,
`startTaskExecutorInContainerAsync` should not receive a future that is
canceled. A canceled future should be removed from `requestResourceFutures`
immediately, in the main thread, thus should not be selected in
`onContainersOfPriorityAllocated` and passed to this method.
##########
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java:
##########
@@ -276,6 +277,57 @@ public CompletableFuture<YarnWorkerNode> requestResource(
} else {
final Priority priority =
priorityAndResourceOpt.get().getPriority();
final Resource resource =
priorityAndResourceOpt.get().getResource();
+
+ requestResourceFuture.whenComplete(
+ (ignore, t) -> {
+ if (t instanceof
ActiveResourceManager.RequestCancelledException) {
+ try {
+ final Queue<CompletableFuture<YarnWorkerNode>>
+ pendingRequestResourceFutures =
+
requestResourceFutures.getOrDefault(
+
taskExecutorProcessSpec,
+ new LinkedList<>());
+ if
(pendingRequestResourceFutures.remove(requestResourceFuture)) {
+ log.info(
+ "cancelling pending request with
priority {}, remaining {} pending container requests.",
+ priority,
+
pendingRequestResourceFutures.size());
+ int pendingRequestsSizeBeforeCancel =
+
pendingRequestResourceFutures.size() + 1;
+ final Iterator<AMRMClient.ContainerRequest>
+ pendingContainerRequestIterator =
+
getPendingRequestsAndCheckConsistency(
+ priority,
+ resource,
+
pendingRequestsSizeBeforeCancel)
+ .iterator();
+
+ Preconditions.checkState(
+
pendingContainerRequestIterator.hasNext());
+
+ final AMRMClient.ContainerRequest
pendingRequest =
+
pendingContainerRequestIterator.next();
+ removeContainerRequest(pendingRequest);
+
+ if
(pendingRequestResourceFutures.isEmpty()) {
+
requestResourceFutures.remove(taskExecutorProcessSpec);
+ }
+
+ if (getNumRequestedNotAllocatedWorkers()
<= 0) {
+
resourceManagerClient.setHeartbeatInterval(
+ yarnHeartbeatIntervalMillis);
+ }
+ } else {
+ log.info(
+ "pending request with priority {}
canceled, but it is already allocated.",
+ priority);
Review Comment:
How could the request already be allocated? Shouldn't access to the request
futures always in main thread?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]