[ https://issues.apache.org/jira/browse/FLINK-9911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552863#comment-16552863 ]
ASF GitHub Bot commented on FLINK-9911: --------------------------------------- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204399866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** + * Tests that all slots are being returned to the {@link SlotOwner} if the + * {@link ExecutionGraph} is being cancelled. See FLINK-9908 + */ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap<SlotRequestId, Integer> slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); --- End diff -- What are the benefits of `OneShotLatch` compared to `new CountDownLatch(1)`. On first glance it seems like an unnecessary re-implementation. > SlotPool#failAllocation is called outside of main thread > -------------------------------------------------------- > > Key: FLINK-9911 > URL: https://issues.apache.org/jira/browse/FLINK-9911 > Project: Flink > Issue Type: Bug > Components: JobManager > Affects Versions: 1.5.1, 1.6.0, 1.7.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > The {{JobMaster}} calls directly into the {{SlotPool#failAllocation}} in the > method {{JobMaster#notifyAllocationFailure}}. This can the {{SlotPool}} to go > into an inconsistent state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)