Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5548#discussion_r169960392
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 ---
    @@ -412,6 +417,54 @@ public void testEagerSchedulingWithSlotTimeout() 
throws Exception {
                verify(taskManager, 
times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
        }
     
    +   /**
    +    * Tests that an ongoing scheduling operation does not fail the {@link 
ExecutionGraph}
    +    * if it gets concurrently cancelled
    +    */
    +   @Test
    +   public void testSchedulingOperationCancellationWhenCancel() throws 
Exception {
    +           final JobVertex jobVertex = new JobVertex("NoOp JobVertex");
    +           jobVertex.setInvokableClass(NoOpInvokable.class);
    +           jobVertex.setParallelism(2);
    +           final JobGraph jobGraph = new JobGraph(jobVertex);
    +           jobGraph.setScheduleMode(ScheduleMode.EAGER);
    +           jobGraph.setAllowQueuedScheduling(true);
    +
    +           final CompletableFuture<LogicalSlot> slotFuture1 = new 
CompletableFuture<>();
    +           final CompletableFuture<LogicalSlot> slotFuture2 = new 
CompletableFuture<>();
    +           final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(2);
    +           slotProvider.addSlots(jobVertex.getID(), new 
CompletableFuture[]{slotFuture1, slotFuture2});
    +           final ExecutionGraph executionGraph = 
createExecutionGraph(jobGraph, slotProvider);
    +
    +           executionGraph.scheduleForExecution();
    +
    +           final CompletableFuture<?> releaseFuture = new 
CompletableFuture<>();
    +
    +           final TestingLogicalSlot slot = new TestingLogicalSlot(
    +                   new LocalTaskManagerLocation(),
    +                   new SimpleAckingTaskManagerGateway(),
    +                   0,
    +                   new AllocationID(),
    +                   new SlotRequestId(),
    +                   new SlotSharingGroupId(),
    +                   releaseFuture);
    +           slotFuture1.complete(slot);
    +
    +           // cancel should change the state of all executions to CANCELLED
    +           executionGraph.cancel();
    +
    +           // complete the now CANCELLED execution --> this should cause a 
failure
    +           slotFuture2.complete(new TestingLogicalSlot());
    +
    +           Thread.sleep(1L);
    --- End diff --
    
    Yes.


---

Reply via email to