XComp commented on a change in pull request #14847:
URL: https://github.com/apache/flink/pull/14847#discussion_r578567820



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java
##########
@@ -817,6 +818,126 @@ public void 
testStopWithSavepointFailingWithDeclinedCheckpoint() throws Exceptio
         assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));
     }
 
+    @Test
+    public void testStopWithSavepointFailingWithExpiredCheckpoint() throws 
Exception {
+        // we allow restarts right from the start since the failure is going 
to happen in the first
+        // phase (savepoint creation) of stop-with-savepoint
+        testRestartBackoffTimeStrategy.setCanRestart(true);
+
+        final JobGraph jobGraph = createTwoVertexJobGraph();
+        // set checkpoint timeout to a low value to simulate checkpoint 
expiration
+        enableCheckpointing(jobGraph, 10);
+
+        final SimpleAckingTaskManagerGateway taskManagerGateway =
+                new SimpleAckingTaskManagerGateway();
+        final CountDownLatch checkpointTriggeredLatch =
+                getCheckpointTriggeredLatch(taskManagerGateway);
+
+        // we have to set a listener that checks for the termination of the 
checkpoint handling
+        OneShotLatch checkpointAbortionWasTriggered = new OneShotLatch();
+        taskManagerGateway.setNotifyCheckpointAbortedConsumer(
+                (executionAttemptId, jobId, actualCheckpointId, timestamp) ->
+                        checkpointAbortionWasTriggered.trigger());
+
+        // the failure handling has to happen in the same thread as the 
checkpoint coordination -
+        // that's why we have to instantiate a separate ThreadExecutorService 
here
+        final ScheduledExecutorService singleThreadExecutorService =
+                Executors.newSingleThreadScheduledExecutor();
+        final ComponentMainThreadExecutor mainThreadExecutor =
+                
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                        singleThreadExecutorService);
+
+        final DefaultScheduler scheduler =
+                CompletableFuture.supplyAsync(
+                                () ->
+                                        createSchedulerAndStartScheduling(
+                                                jobGraph, mainThreadExecutor),
+                                mainThreadExecutor)
+                        .get();
+
+        final ExecutionAttemptID succeedingExecutionAttemptId =
+                
Iterables.get(scheduler.getExecutionGraph().getAllExecutionVertices(), 0)
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+        final ExecutionAttemptID failingExecutionAttemptId =
+                
Iterables.getLast(scheduler.getExecutionGraph().getAllExecutionVertices())
+                        .getCurrentExecutionAttempt()
+                        .getAttemptId();
+
+        final CompletableFuture<String> stopWithSavepointFuture =
+                CompletableFuture.supplyAsync(
+                                () -> {
+                                    // we have to make sure that the tasks are 
running before
+                                    // stop-with-savepoint is triggered
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    failingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+                                    scheduler.updateTaskExecutionState(
+                                            new TaskExecutionState(
+                                                    jobGraph.getJobID(),
+                                                    
succeedingExecutionAttemptId,
+                                                    ExecutionState.RUNNING));
+
+                                    return 
scheduler.stopWithSavepoint("savepoint-path", false);
+                                },
+                                mainThreadExecutor)
+                        .get();
+
+        checkpointTriggeredLatch.await();
+
+        final CheckpointCoordinator checkpointCoordinator = 
getCheckpointCoordinator(scheduler);
+
+        final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                new AcknowledgeCheckpoint(jobGraph.getJobID(), 
succeedingExecutionAttemptId, 1);
+
+        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
"unknown location");
+
+        // we need to wait for the expired checkpoint to be handled
+        checkpointAbortionWasTriggered.await();
+
+        CompletableFuture.runAsync(
+                        () -> {
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            succeedingExecutionAttemptId,
+                                            ExecutionState.FINISHED));
+                            scheduler.updateTaskExecutionState(
+                                    new TaskExecutionState(
+                                            jobGraph.getJobID(),
+                                            failingExecutionAttemptId,
+                                            ExecutionState.FAILED));
+
+                            // the restart due to failed checkpoint handling 
triggering a global job
+                            // fail-over
+                            assertThat(
+                                    
taskRestartExecutor.getNonPeriodicScheduledTask(), hasSize(1));
+                            
taskRestartExecutor.triggerNonPeriodicScheduledTasks();
+                        },
+                        mainThreadExecutor)
+                .get();
+
+        try {
+            stopWithSavepointFuture.get();
+            fail("An exception is expected.");
+        } catch (ExecutionException e) {
+            Optional<CheckpointException> actualCheckpointException =
+                    findThrowable(e, CheckpointException.class);
+            assertTrue(actualCheckpointException.isPresent());
+            assertThat(
+                    
actualCheckpointException.get().getCheckpointFailureReason(),
+                    is(CheckpointFailureReason.CHECKPOINT_EXPIRED));
+        }
+
+        // we have to wait for the main executor to be finished with 
restarting tasks
+        singleThreadExecutorService.shutdown();
+        singleThreadExecutorService.awaitTermination(1, TimeUnit.SECONDS);
+
+        assertThat(scheduler.getExecutionGraph().getState(), 
is(JobStatus.RUNNING));

Review comment:
       My above comment became obsolete with the most recent changes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to