[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6386


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204435941
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
@@ -504,6 +512,107 @@ public void 
testSlotReleasingFailsSchedulingOperation() throws Exception {
assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.FAILED));
}
 
+   /**
+* Tests that all slots are being returned to the {@link SlotOwner} if 
the
+* {@link ExecutionGraph} is being cancelled. See FLINK-9908
+*/
+   @Test
+   public void testCancellationOfIncompleteScheduling() throws Exception {
+   final int parallelism = 10;
+
+   final JobVertex jobVertex = new JobVertex("Test job vertex");
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+   jobVertex.setParallelism(parallelism);
+
+   final JobGraph jobGraph = new JobGraph(jobVertex);
+   jobGraph.setAllowQueuedScheduling(true);
+   jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+   final TestingSlotOwner slotOwner = new TestingSlotOwner();
+   final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   final ConcurrentMap slotRequestIds = 
new ConcurrentHashMap<>(parallelism);
+   final CountDownLatch requestedSlotsLatch = new 
CountDownLatch(parallelism);
+
+   final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+   (SlotRequestId slotRequestId) -> {
+   slotRequestIds.put(slotRequestId, 1);
+   requestedSlotsLatch.countDown();
+   return new CompletableFuture<>();
+   });
+
+
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+   executionGraph.scheduleForExecution();
+
+   // wait until we have requested all slots
+   requestedSlotsLatch.await();
+
+   final ExpectedSlotRequestIds expectedSlotRequestIds = new 
ExpectedSlotRequestIds(slotRequestIds.keySet());
+   slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> 
expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId()));
+   
slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId);
+
+   final OneShotLatch slotRequestsBeingFulfilled = new 
OneShotLatch();
+
+   // start completing the slot requests asynchronously
+   executor.execute(
+   () -> {
+   slotRequestsBeingFulfilled.trigger();
+
+   for (SlotRequestId slotRequestId : 
slotRequestIds.keySet()) {
+   final SingleLogicalSlot 
singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, 
slotRequestId);
+   slotProvider.complete(slotRequestId, 
singleLogicalSlot);
+   }
+   });
+
+   // make sure that we complete cancellations of deployed tasks
+   taskManagerGateway.setCancelConsumer(
+   (ExecutionAttemptID executionAttemptId) -> {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttemptId);
+
+   // if the execution was cancelled in state 
SCHEDULING, then it might already have been removed
+   if (execution != null) {
+   execution.cancelingComplete();
+   }
+   }
+   );
+
+   slotRequestsBeingFulfilled.await();
+
+   executionGraph.cancel();
+
+   expectedSlotRequestIds.waitForAllSlotRequestIds();
+   }
+
+   private static final class ExpectedSlotRequestIds {
--- End diff --

I'm actually not super happy with this solution either. I will reconsider 
and see whether it's nicer with your proposed solution. Thanks for the input!


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204435027
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
@@ -504,6 +512,107 @@ public void 
testSlotReleasingFailsSchedulingOperation() throws Exception {
assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.FAILED));
}
 
+   /**
+* Tests that all slots are being returned to the {@link SlotOwner} if 
the
+* {@link ExecutionGraph} is being cancelled. See FLINK-9908
+*/
+   @Test
+   public void testCancellationOfIncompleteScheduling() throws Exception {
+   final int parallelism = 10;
+
+   final JobVertex jobVertex = new JobVertex("Test job vertex");
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+   jobVertex.setParallelism(parallelism);
+
+   final JobGraph jobGraph = new JobGraph(jobVertex);
+   jobGraph.setAllowQueuedScheduling(true);
+   jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+   final TestingSlotOwner slotOwner = new TestingSlotOwner();
+   final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   final ConcurrentMap slotRequestIds = 
new ConcurrentHashMap<>(parallelism);
+   final CountDownLatch requestedSlotsLatch = new 
CountDownLatch(parallelism);
+
+   final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+   (SlotRequestId slotRequestId) -> {
+   slotRequestIds.put(slotRequestId, 1);
+   requestedSlotsLatch.countDown();
+   return new CompletableFuture<>();
+   });
+
+
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+   executionGraph.scheduleForExecution();
+
+   // wait until we have requested all slots
+   requestedSlotsLatch.await();
+
+   final ExpectedSlotRequestIds expectedSlotRequestIds = new 
ExpectedSlotRequestIds(slotRequestIds.keySet());
+   slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> 
expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId()));
+   
slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId);
+
+   final OneShotLatch slotRequestsBeingFulfilled = new 
OneShotLatch();
--- End diff --

Good question. I think it does not add much more value and could be 
replaced by `CountDownLatch(1)`. This should be orthogonal to this PR, though.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204434452
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -323,7 +323,7 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Allocating slot with request {} for task execution 
{}", slotRequestId, task.getTaskToExecute());
+   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
--- End diff --

I tried to change it for all ids (`SlotRequestID` and `AllocationID`) 
because they now print `SlotRequestID{1234...}` and `AllocationID{9876...}` in 
order to make them easier to identify.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204400729
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
@@ -504,6 +512,107 @@ public void 
testSlotReleasingFailsSchedulingOperation() throws Exception {
assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.FAILED));
}
 
+   /**
+* Tests that all slots are being returned to the {@link SlotOwner} if 
the
+* {@link ExecutionGraph} is being cancelled. See FLINK-9908
+*/
+   @Test
+   public void testCancellationOfIncompleteScheduling() throws Exception {
+   final int parallelism = 10;
+
+   final JobVertex jobVertex = new JobVertex("Test job vertex");
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+   jobVertex.setParallelism(parallelism);
+
+   final JobGraph jobGraph = new JobGraph(jobVertex);
+   jobGraph.setAllowQueuedScheduling(true);
+   jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+   final TestingSlotOwner slotOwner = new TestingSlotOwner();
+   final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   final ConcurrentMap slotRequestIds = 
new ConcurrentHashMap<>(parallelism);
+   final CountDownLatch requestedSlotsLatch = new 
CountDownLatch(parallelism);
+
+   final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+   (SlotRequestId slotRequestId) -> {
+   slotRequestIds.put(slotRequestId, 1);
+   requestedSlotsLatch.countDown();
+   return new CompletableFuture<>();
+   });
+
+
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+   executionGraph.scheduleForExecution();
+
+   // wait until we have requested all slots
+   requestedSlotsLatch.await();
+
+   final ExpectedSlotRequestIds expectedSlotRequestIds = new 
ExpectedSlotRequestIds(slotRequestIds.keySet());
+   slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> 
expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId()));
+   
slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId);
+
+   final OneShotLatch slotRequestsBeingFulfilled = new 
OneShotLatch();
+
+   // start completing the slot requests asynchronously
+   executor.execute(
+   () -> {
+   slotRequestsBeingFulfilled.trigger();
+
+   for (SlotRequestId slotRequestId : 
slotRequestIds.keySet()) {
+   final SingleLogicalSlot 
singleLogicalSlot = createSingleLogicalSlot(slotOwner, taskManagerGateway, 
slotRequestId);
+   slotProvider.complete(slotRequestId, 
singleLogicalSlot);
+   }
+   });
+
+   // make sure that we complete cancellations of deployed tasks
+   taskManagerGateway.setCancelConsumer(
+   (ExecutionAttemptID executionAttemptId) -> {
+   final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttemptId);
+
+   // if the execution was cancelled in state 
SCHEDULING, then it might already have been removed
+   if (execution != null) {
+   execution.cancelingComplete();
+   }
+   }
+   );
+
+   slotRequestsBeingFulfilled.await();
+
+   executionGraph.cancel();
+
+   expectedSlotRequestIds.waitForAllSlotRequestIds();
+   }
+
+   private static final class ExpectedSlotRequestIds {
--- End diff --

I think this is fine but I would have probably used a synchronized set and 
a `CountDownLatch`. It would allow for timeouts and also asserting on which ids 
are missing.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204399866
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
@@ -504,6 +512,107 @@ public void 
testSlotReleasingFailsSchedulingOperation() throws Exception {
assertThat(executionGraph.getTerminationFuture().get(), 
is(JobStatus.FAILED));
}
 
+   /**
+* Tests that all slots are being returned to the {@link SlotOwner} if 
the
+* {@link ExecutionGraph} is being cancelled. See FLINK-9908
+*/
+   @Test
+   public void testCancellationOfIncompleteScheduling() throws Exception {
+   final int parallelism = 10;
+
+   final JobVertex jobVertex = new JobVertex("Test job vertex");
+   jobVertex.setInvokableClass(NoOpInvokable.class);
+   jobVertex.setParallelism(parallelism);
+
+   final JobGraph jobGraph = new JobGraph(jobVertex);
+   jobGraph.setAllowQueuedScheduling(true);
+   jobGraph.setScheduleMode(ScheduleMode.EAGER);
+
+   final TestingSlotOwner slotOwner = new TestingSlotOwner();
+   final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+
+   final ConcurrentMap slotRequestIds = 
new ConcurrentHashMap<>(parallelism);
+   final CountDownLatch requestedSlotsLatch = new 
CountDownLatch(parallelism);
+
+   final TestingSlotProvider slotProvider = new 
TestingSlotProvider(
+   (SlotRequestId slotRequestId) -> {
+   slotRequestIds.put(slotRequestId, 1);
+   requestedSlotsLatch.countDown();
+   return new CompletableFuture<>();
+   });
+
+
+   final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
+
+   executionGraph.scheduleForExecution();
+
+   // wait until we have requested all slots
+   requestedSlotsLatch.await();
+
+   final ExpectedSlotRequestIds expectedSlotRequestIds = new 
ExpectedSlotRequestIds(slotRequestIds.keySet());
+   slotOwner.setReturnAllocatedSlotConsumer(logicalSlot -> 
expectedSlotRequestIds.notifySlotRequestId(logicalSlot.getSlotRequestId()));
+   
slotProvider.setSlotCanceller(expectedSlotRequestIds::notifySlotRequestId);
+
+   final OneShotLatch slotRequestsBeingFulfilled = new 
OneShotLatch();
--- End diff --

What are the benefits of `OneShotLatch` compared to `new 
CountDownLatch(1)`. On first glance it seems like an unnecessary 
re-implementation.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204394265
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -384,6 +393,8 @@ private MultiTaskSlot(
MultiTaskSlot allocateMultiTaskSlot(SlotRequestId 
slotRequestId, AbstractID groupId) {
Preconditions.checkState(!super.contains(groupId));
 
+   LOG.debug("Create nested multi task slot [{}] in parent 
multi task slot [{}] for group {}.", slotRequestId, getSlotRequestId(), 
groupId);
--- End diff --

add `[]`


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204384134
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -983,7 +983,7 @@ private void startCheckpointScheduler(final 
CheckpointCoordinator checkpointCoor
 
@Override
public void notifyAllocationFailure(AllocationID allocationID, 
Exception cause) {
-   slotPool.failAllocation(allocationID, cause);
+   slotPoolGateway.failAllocation(allocationID, cause);
--- End diff --

The changes don't match the commit message.


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204373664
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -323,7 +323,7 @@ public void disconnectResourceManager() {
boolean allowQueuedScheduling,
Time allocationTimeout) {
 
-   log.debug("Allocating slot with request {} for task execution 
{}", slotRequestId, task.getTaskToExecute());
+   log.debug("Received slot request [{}] for task: {}", 
slotRequestId, task.getTaskToExecute());
--- End diff --

When should the value be enclosed in `[]`?


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-23 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6386#discussion_r204368060
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -1011,7 +1011,7 @@ public void failAllocation(final AllocationID 
allocationID, final Exception caus
failPendingRequest(pendingRequest, cause);
}
else if (availableSlots.tryRemove(allocationID)) {
-   log.debug("Failed available slot [{}] with ", 
allocationID, cause);
+   log.debug("Failed available slot with allocation id 
{}.", allocationID, cause);
--- End diff --

Why remove *allocation id*?


---


[GitHub] flink pull request #6386: [FLINK-9911][JM] Use SlotPoolGateway to call failA...

2018-07-22 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/6386

[FLINK-9911][JM] Use SlotPoolGateway to call failAllocation

## What is the purpose of the change

Since the SlotPool is an actor, we must use the SlotPoolGateway to interact 
with
the SlotPool. Otherwise, we might risk an inconsistent state since there are
multiple threads modifying the component.

This PR is based on #6385.

## Verifying this change

- Trivial change

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixFailAllocation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6386


commit f85ec37cc3ad21998eabad45a6dcb46e8efc62fb
Author: Till Rohrmann 
Date:   2018-07-19T11:07:44Z

[FLINK-9838][logging] Don't log slot request failures on the ResourceManager

commit 7c703fb3b350ef5b02b01d621c3a16d4bca6f707
Author: Till Rohrmann 
Date:   2018-07-19T11:41:03Z

[hotfix] Improve logging of SlotPool and SlotSharingManager

commit 414a8d231a5b6cdc2d5db0c1d35a79ff584c1cd0
Author: Till Rohrmann 
Date:   2018-07-22T18:05:05Z

[FLINK-9908][scheduling] Do not cancel individual scheduling future

Since the individual scheduling futures contain logic to release the slot 
if it cannot
be assigned to the Execution, we must not cancel them. Otherwise we might 
risk that
slots are not returned to the SlotPool leaving it in an inconsistent state.

commit 8f4471339db3a2df01c1cc61e03eb0881f98dd4f
Author: Till Rohrmann 
Date:   2018-07-22T18:17:11Z

[FLINK-9909][core] ConjunctFuture does not cancel input futures

If a ConjunctFuture is cancelled, then it won't cancel all of its input
futures automatically. If the users needs this behaviour then he has to
implement it explicitly. The reason for this change is that an implicit
cancellation can have unwanted side effects, because all of the cancelled
input futures' producers won't be executed.

commit c606145182c0531a8239decdc52ceeccdb81ca73
Author: Till Rohrmann 
Date:   2018-07-22T18:20:53Z

[hotfix] Fix checkstyle violations in FutureUtils

commit c296d8b146cd08367329226b9ecaa28bd86ba1ed
Author: Till Rohrmann 
Date:   2018-07-22T18:34:33Z

[hotfix] Replace check state condition in Execution#tryAssignResource with 
if check

Instead of risking an IllegalStateException it is better to check that the
taskManagerLocationFuture has not been completed yet. If, then we also 
reject
the assignment of the LogicalSlot to the Execution. That way, we don't risk
that we don't release the slot in case of an exception in
Execution#allocateAndAssignSlotForExecution.

commit 69b8c7c7b5905be83c7c393423c064de9b78375f
Author: Till Rohrmann 
Date:   2018-07-22T18:43:44Z

[hotfix] Fix checkstyle violations in ExecutionVertex

commit 6e018cfdf84192041a4b1ba27dcbdbf645e8d40b
Author: Till Rohrmann 
Date:   2018-07-22T18:46:37Z

[hotfix] Fix checkstyle violations in ExecutionJobVertex

commit f8805be13d2c0c2da58e0e7ecc6dc102953fc0c5
Author: Till Rohrmann 
Date:   2018-07-22T18:48:53Z

[hotfix] Fix checkstyle violations in Execution

commit 0e9fbf8157e45d260a1a418c25871031a98a4995
Author: Till Rohrmann 
Date:   2018-07-22T19:38:42Z

[FLINK-9910][scheduling] Execution#scheduleForeExecution does not cancel 
slot future

In order to properly give back an allocated slot to the SlotPool, one must 
not complete
the result future of Execution#allocateAndAssignSlotForExecution. This 
commit changes the
behaviour in Execution#scheduleForExecution accordingly.

commit 1b221e062e5c800f4ce8e716e36f67abcbd75394
Author: Till Rohrmann 
Date:   2018-07-22T19:57:59Z

[FLINK-9911][JM] Use SlotPoolGateway to call failAllocation

Since the SlotPool is an actor, we must use the SlotPoolGateway to interact 
with
the SlotPool. Otherwise, we might risk an inconsistent state since there are
multiple threads modifying the component.




---