Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204400729 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** + * Tests that all slots are being returned to the {@link SlotOwner} if the + * {@link ExecutionGraph} is being cancelled. See FLINK-9908 + */ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap<SlotRequestId, Integer> slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); + + // start completing the slot requests asynchronously + executor.execute( + () -> { + slotRequestsBeingFulfilled.trigger(); + + for (SlotRequestId slotRequestId : slotRequestIds.keySet()) { + final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, slotRequestId); + slotProvider.complete(slotRequestId, singleLogicalSlot); + } + }); + + // make sure that we complete cancellations of deployed tasks + taskManagerGateway.setCancelConsumer( + (ExecutionAttemptID executionAttemptId) -> { + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttemptId); + + // if the execution was cancelled in state SCHEDULING, then it might already have been removed + if (execution != null) { + execution.cancelingComplete(); + } + } + ); + + slotRequestsBeingFulfilled.await(); + + executionGraph.cancel(); + + expectedSlotRequestIds.waitForAllSlotRequestIds(); + } + + private static final class ExpectedSlotRequestIds { --- End diff -- I think this is fine but I would have probably used a synchronized set and a `CountDownLatch`. It would allow for timeouts and also asserting on which ids are missing.
---