Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5087#discussion_r156728375
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 ---
    @@ -267,10 +270,77 @@ public void testAnyPreferredLocationCalculation() 
throws ExecutionException, Int
                assertThat(preferredLocations, 
containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
        }
     
    +   /**
    +    * Checks that the {@link Execution} termination future is only 
completed after the
    +    * assigned slot has been released.
    +    *
    +    * <p>NOTE: This test only fails spuriously without the fix of this 
commit. Thus, one has
    +    * to execute this test multiple times to see the failure.
    +    */
    +   @Test
    +   public void testTerminationFutureIsCompletedAfterSlotRelease() throws 
Exception {
    +           final JobVertexID jobVertexId = new JobVertexID();
    +           final JobVertex jobVertex = new JobVertex("Test vertex", 
jobVertexId);
    +           jobVertex.setInvokableClass(NoOpInvokable.class);
    +
    +           final SingleSlotTestingSlotOwner slotOwner = new 
SingleSlotTestingSlotOwner();
    +
    +           final SimpleSlot slot = new SimpleSlot(
    +                   new JobID(),
    +                   slotOwner,
    +                   new LocalTaskManagerLocation(),
    +                   0,
    +                   new SimpleAckingTaskManagerGateway());
    +
    +           final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
    +           slotProvider.addSlot(jobVertexId, 0, 
CompletableFuture.completedFuture(slot));
    +
    +           ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
    +                   new JobID(),
    +                   slotProvider,
    +                   new NoRestartStrategy(),
    +                   jobVertex);
    +
    +           ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
    +
    +           ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
    +
    +           assertTrue(executionVertex.scheduleForExecution(slotProvider, 
false, LocationPreferenceConstraint.ANY));
    +
    +           Execution currentExecutionAttempt = 
executionVertex.getCurrentExecutionAttempt();
    +
    +           CompletableFuture<Slot> returnedSlotFuture = 
slotOwner.getReturnedSlotFuture();
    +           CompletableFuture<?> terminationFuture = 
executionVertex.cancel();
    +
    +           // run canceling in a separate thread to allow an interleaving 
between termination
    +           // future callback registrations
    +           CompletableFuture.runAsync(
    +                   () -> currentExecutionAttempt.cancelingComplete(),
    +                   TestingUtils.defaultExecutor());
    +
    +           // to increase probability for problematic interleaving, let 
the current thread yield the processor
    +           Thread.yield();
    +
    +           CompletableFuture<Boolean> restartFuture = 
terminationFuture.thenApply(
    +                   ignored -> {
    +                           try {
    +                                   assertTrue(returnedSlotFuture.isDone());
    +                           } catch (Exception e) {
    +                                   throw new CompletionException(e);
    --- End diff --
    
    You are right. Will remove it.


---

Reply via email to