Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5739#discussion_r176400741 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java --- @@ -634,6 +653,99 @@ public void testCheckIdleSlot() throws Exception { } } + /** + * Tests that idle slots which cannot be released are only recycled if the owning {@link TaskExecutor} + * is still registered at the {@link SlotPool}. See FLINK-9047. + */ + @Test + public void testReleasingIdleSlotFailed() throws Exception { + final ManualClock clock = new ManualClock(); + final SlotPool slotPool = new SlotPool( + rpcService, + jobId, + clock, + TestingUtils.infiniteTime(), + timeout); + + try { + final SlotPoolGateway slotPoolGateway = setupSlotPool(slotPool, resourceManagerGateway); + + final AllocationID expiredAllocationId = new AllocationID(); + final SlotOffer slotToExpire = new SlotOffer(expiredAllocationId, 0, ResourceProfile.UNKNOWN); + + final ArrayDeque<CompletableFuture<Acknowledge>> responseQueue = new ArrayDeque<>(2); + taskManagerGateway.setFreeSlotFunction((AllocationID allocationId, Throwable cause) -> { + if (responseQueue.isEmpty()) { + return CompletableFuture.completedFuture(Acknowledge.get()); + } else { + return responseQueue.pop(); + } + }); + + responseQueue.add(FutureUtils.completedExceptionally(new FlinkException("Test failure"))); + + final CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<>(); + responseQueue.add(responseFuture); + + assertThat( + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(), + Matchers.is(Acknowledge.get())); + + assertThat( + slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotToExpire).get(), + Matchers.is(true)); + + clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + slotPool.triggerCheckIdleSlot(); + + CompletableFuture<LogicalSlot> allocatedSlotFuture = slotPoolGateway.allocateSlot( + new SlotRequestId(), + new DummyScheduledUnit(), + SlotProfile.noRequirements(), + true, + timeout); + + // wait until the slot has been fulfilled with the previously idling slot + final LogicalSlot logicalSlot = allocatedSlotFuture.get(); + assertThat(logicalSlot.getAllocationId(), Matchers.is(expiredAllocationId)); + + // return the slot + slotPool.getSlotOwner().returnAllocatedSlot(logicalSlot).get(); + + // advance the time so that the returned slot is now idling + clock.advanceTime(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + slotPool.triggerCheckIdleSlot(); + + // request a new slot after the idling slot has been released + allocatedSlotFuture = slotPoolGateway.allocateSlot( + new SlotRequestId(), + new DummyScheduledUnit(), + SlotProfile.noRequirements(), + true, + timeout); + + // release the TaskExecutor before we get a response from the slot releasing + slotPoolGateway.releaseTaskManager(taskManagerLocation.getResourceID()).get(); + + // let the slot releasing fail --> since there the owning TaskExecutor is no longer registered --- End diff -- nit: *[...] since there the owning [...]*
---