Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4887#discussion_r148754465 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java --- @@ -302,7 +302,12 @@ public boolean unregisterSlotRequest(AllocationID allocationId) { PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); if (null != pendingSlotRequest) { - cancelPendingSlotRequest(pendingSlotRequest); + if (pendingSlotRequest.isAssigned()) { + cancelPendingSlotRequest(pendingSlotRequest); + } + else { + resourceActions.cancelResourceAllocation(pendingSlotRequest.getResourceProfile()); --- End diff -- Then this should be added as a separate feature because it is not strictly required by this PR here. Furthermore, I'm not sure whether this shouldn't be the responsibility of the `ResourceManager`. E.g. we could think about adding a timeout for container requests after which we cancel them. Additionally, if we add support for starting machines with multiple slots, then we shouldn't release a requested resource if only a single of it slots is no longer needed. That is something else to consider before adding the cancel resource allocation method.
---