Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204399866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** + * Tests that all slots are being returned to the {@link SlotOwner} if the + * {@link ExecutionGraph} is being cancelled. See FLINK-9908 + */ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap<SlotRequestId, Integer> slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); --- End diff -- What are the benefits of `OneShotLatch` compared to `new CountDownLatch(1)`. On first glance it seems like an unnecessary re-implementation.
---