[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6386 ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204435941 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** +* Tests that all slots are being returned to the {@link SlotOwner} if the +* {@link ExecutionGraph} is being cancelled. See FLINK-9908 +*/ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); + + // start completing the slot requests asynchronously + executor.execute( + () -> { + slotRequestsBeingFulfilled.trigger(); + + for (SlotRequestId slotRequestId : slotRequestIds.keySet()) { + final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, slotRequestId); + slotProvider.complete(slotRequestId, singleLogicalSlot); + } + }); + + // make sure that we complete cancellations of deployed tasks + taskManagerGateway.setCancelConsumer( + (ExecutionAttemptID executionAttemptId) -> { + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttemptId); + + // if the execution was cancelled in state SCHEDULING, then it might already have been removed + if (execution != null) { + execution.cancelingComplete(); + } + } + ); + + slotRequestsBeingFulfilled.await(); + + executionGraph.cancel(); + + expectedSlotRequestIds.waitForAllSlotRequestIds(); + } + + private static final class ExpectedSlotRequestIds { --- End diff -- I'm actually not super happy with this solution either. I will reconsider and see whether it's nicer with your proposed solution. Thanks for the input! ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204435027 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** +* Tests that all slots are being returned to the {@link SlotOwner} if the +* {@link ExecutionGraph} is being cancelled. See FLINK-9908 +*/ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); --- End diff -- Good question. I think it does not add much more value and could be replaced by `CountDownLatch(1)`. This should be orthogonal to this PR, though. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204434452 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -323,7 +323,7 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute()); + log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); --- End diff -- I tried to change it for all ids (`SlotRequestID` and `AllocationID`) because they now print `SlotRequestID{1234...}` and `AllocationID{9876...}` in order to make them easier to identify. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204400729 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** +* Tests that all slots are being returned to the {@link SlotOwner} if the +* {@link ExecutionGraph} is being cancelled. See FLINK-9908 +*/ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); + + // start completing the slot requests asynchronously + executor.execute( + () -> { + slotRequestsBeingFulfilled.trigger(); + + for (SlotRequestId slotRequestId : slotRequestIds.keySet()) { + final SingleLogicalSlot singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, slotRequestId); + slotProvider.complete(slotRequestId, singleLogicalSlot); + } + }); + + // make sure that we complete cancellations of deployed tasks + taskManagerGateway.setCancelConsumer( + (ExecutionAttemptID executionAttemptId) -> { + final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttemptId); + + // if the execution was cancelled in state SCHEDULING, then it might already have been removed + if (execution != null) { + execution.cancelingComplete(); + } + } + ); + + slotRequestsBeingFulfilled.await(); + + executionGraph.cancel(); + + expectedSlotRequestIds.waitForAllSlotRequestIds(); + } + + private static final class ExpectedSlotRequestIds { --- End diff -- I think this is fine but I would have probably used a synchronized set and a `CountDownLatch`. It would allow for timeouts and also asserting on which ids are missing. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204399866 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java --- @@ -504,6 +512,107 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { assertThat(executionGraph.getTerminationFuture().get(), is(JobStatus.FAILED)); } + /** +* Tests that all slots are being returned to the {@link SlotOwner} if the +* {@link ExecutionGraph} is being cancelled. See FLINK-9908 +*/ + @Test + public void testCancellationOfIncompleteScheduling() throws Exception { + final int parallelism = 10; + + final JobVertex jobVertex = new JobVertex("Test job vertex"); + jobVertex.setInvokableClass(NoOpInvokable.class); + jobVertex.setParallelism(parallelism); + + final JobGraph jobGraph = new JobGraph(jobVertex); + jobGraph.setAllowQueuedScheduling(true); + jobGraph.setScheduleMode(ScheduleMode.EAGER); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + + final ConcurrentMap slotRequestIds = new ConcurrentHashMap<>(parallelism); + final CountDownLatch requestedSlotsLatch = new CountDownLatch(parallelism); + + final TestingSlotProvider slotProvider = new TestingSlotProvider( + (SlotRequestId slotRequestId) -> { + slotRequestIds.put(slotRequestId, 1); + requestedSlotsLatch.countDown(); + return new CompletableFuture<>(); + }); + + + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph, slotProvider); + + executionGraph.scheduleForExecution(); + + // wait until we have requested all slots + requestedSlotsLatch.await(); + + final ExpectedSlotRequestIds expectedSlotRequestIds = new ExpectedSlotRequestIds(slotRequestIds.keySet()); + slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId())); + slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId); + + final OneShotLatch slotRequestsBeingFulfilled = new OneShotLatch(); --- End diff -- What are the benefits of `OneShotLatch` compared to `new CountDownLatch(1)`. On first glance it seems like an unnecessary re-implementation. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204394265 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java --- @@ -384,6 +393,8 @@ private MultiTaskSlot( MultiTaskSlot allocateMultiTaskSlot(SlotRequestId slotRequestId, AbstractID groupId) { Preconditions.checkState(!super.contains(groupId)); + LOG.debug("Create nested multi task slot [{}] in parent multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), groupId); --- End diff -- add `[]` ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204384134 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -983,7 +983,7 @@ private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoor @Override public void notifyAllocationFailure(AllocationID allocationID, Exception cause) { - slotPool.failAllocation(allocationID, cause); + slotPoolGateway.failAllocation(allocationID, cause); --- End diff -- The changes don't match the commit message. ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204373664 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -323,7 +323,7 @@ public void disconnectResourceManager() { boolean allowQueuedScheduling, Time allocationTimeout) { - log.debug("Allocating slot with request {} for task execution {}", slotRequestId, task.getTaskToExecute()); + log.debug("Received slot request [{}] for task: {}", slotRequestId, task.getTaskToExecute()); --- End diff -- When should the value be enclosed in `[]`? ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/6386#discussion_r204368060 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java --- @@ -1011,7 +1011,7 @@ public void failAllocation(final AllocationID allocationID, final Exception caus failPendingRequest(pendingRequest, cause); } else if (availableSlots.tryRemove(allocationID)) { - log.debug("Failed available slot [{}] with ", allocationID, cause); + log.debug("Failed available slot with allocation id {}.", allocationID, cause); --- End diff -- Why remove *allocation id*? ---
[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/6386 [FLINK-9911][JM] Use SlotPoolGateway to call failAllocation ## What is the purpose of the change Since the SlotPool is an actor, we must use the SlotPoolGateway to interact with the SlotPool. Otherwise, we might risk an inconsistent state since there are multiple threads modifying the component. This PR is based on #6385. ## Verifying this change - Trivial change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixFailAllocation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6386.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6386 commit f85ec37cc3ad21998eabad45a6dcb46e8efc62fb Author: Till Rohrmann Date: 2018-07-19T11:07:44Z [FLINK-9838][logging] Don't log slot request failures on the ResourceManager commit 7c703fb3b350ef5b02b01d621c3a16d4bca6f707 Author: Till Rohrmann Date: 2018-07-19T11:41:03Z [hotfix] Improve logging of SlotPool and SlotSharingManager commit 414a8d231a5b6cdc2d5db0c1d35a79ff584c1cd0 Author: Till Rohrmann Date: 2018-07-22T18:05:05Z [FLINK-9908][scheduling] Do not cancel individual scheduling future Since the individual scheduling futures contain logic to release the slot if it cannot be assigned to the Execution, we must not cancel them. Otherwise we might risk that slots are not returned to the SlotPool leaving it in an inconsistent state. commit 8f4471339db3a2df01c1cc61e03eb0881f98dd4f Author: Till Rohrmann Date: 2018-07-22T18:17:11Z [FLINK-9909][core] ConjunctFuture does not cancel input futures If a ConjunctFuture is cancelled, then it won't cancel all of its input futures automatically. If the users needs this behaviour then he has to implement it explicitly. The reason for this change is that an implicit cancellation can have unwanted side effects, because all of the cancelled input futures' producers won't be executed. commit c606145182c0531a8239decdc52ceeccdb81ca73 Author: Till Rohrmann Date: 2018-07-22T18:20:53Z [hotfix] Fix checkstyle violations in FutureUtils commit c296d8b146cd08367329226b9ecaa28bd86ba1ed Author: Till Rohrmann Date: 2018-07-22T18:34:33Z [hotfix] Replace check state condition in Execution#tryAssignResource with if check Instead of risking an IllegalStateException it is better to check that the taskManagerLocationFuture has not been completed yet. If, then we also reject the assignment of the LogicalSlot to the Execution. That way, we don't risk that we don't release the slot in case of an exception in Execution#allocateAndAssignSlotForExecution. commit 69b8c7c7b5905be83c7c393423c064de9b78375f Author: Till Rohrmann Date: 2018-07-22T18:43:44Z [hotfix] Fix checkstyle violations in ExecutionVertex commit 6e018cfdf84192041a4b1ba27dcbdbf645e8d40b Author: Till Rohrmann Date: 2018-07-22T18:46:37Z [hotfix] Fix checkstyle violations in ExecutionJobVertex commit f8805be13d2c0c2da58e0e7ecc6dc102953fc0c5 Author: Till Rohrmann Date: 2018-07-22T18:48:53Z [hotfix] Fix checkstyle violations in Execution commit 0e9fbf8157e45d260a1a418c25871031a98a4995 Author: Till Rohrmann Date: 2018-07-22T19:38:42Z [FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel slot future In order to properly give back an allocated slot to the SlotPool, one must not complete the result future of Execution#allocateAndAssignSlotForExecution. This commit changes the behaviour in Execution#scheduleForExecution accordingly. commit 1b221e062e5c800f4ce8e716e36f67abcbd75394 Author: Till Rohrmann Date: 2018-07-22T19:57:59Z [FLINK-9911][JM] Use SlotPoolGateway to call failAllocation Since the SlotPool is an actor, we must use the SlotPoolGateway to interact with the SlotPool. Otherwise, we might risk an inconsistent state since there are multiple threads modifying the component. ---