This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b3e14928e81 [FLINK-31041][runtime] Fix multiple restoreState when 
GlobalFailure occurs in a short period.
b3e14928e81 is described below

commit b3e14928e815dd6dbdffbe3c5616733d4c7c8825
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Mon Feb 20 15:39:06 2023 +0800

    [FLINK-31041][runtime] Fix multiple restoreState when GlobalFailure occurs 
in a short period.
---
 .../flink/runtime/scheduler/DefaultScheduler.java   |  4 ++++
 .../OperatorCoordinatorSchedulerTest.java           | 21 +++++++++++++++++++++
 .../coordination/TestingOperatorCoordinator.java    | 10 ++++++++++
 3 files changed, 35 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index ecd2a2467cb..7127c0f9986 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -377,6 +377,10 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
         final Set<ExecutionVertexID> verticesToRestart =
                 
executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions);
 
+        if (verticesToRestart.isEmpty()) {
+            return;
+        }
+
         removeVerticesFromRestartPending(verticesToRestart);
 
         resetForNewExecutions(verticesToRestart);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index b786294ccc6..020a584cc61 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -89,6 +89,7 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
@@ -372,6 +373,26 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                 .isEqualTo(OperatorCoordinator.NO_CHECKPOINT);
     }
 
+    @Test
+    public void testGlobalFailureTwiceWillNotResetToCheckpointTwice() throws 
Exception {
+        final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
+        final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+        AtomicInteger resetToCheckpointCounter = new AtomicInteger(0);
+        coordinator.setResetToCheckpointConsumer(
+                (ignore1, ignore2) -> 
resetToCheckpointCounter.incrementAndGet());
+
+        // fail global twice.
+        scheduler.handleGlobalFailure(new TestException());
+        failGlobalAndRestart(scheduler, new TestException());
+
+        assertThat(resetToCheckpointCounter.get()).isEqualTo(1);
+        assertThat(coordinator.getLastRestoredCheckpointState())
+                .as("coordinator should have null restored state")
+                .isEqualTo(TestingOperatorCoordinator.NULL_RESTORE_VALUE);
+        assertThat(coordinator.getLastRestoredCheckpointId())
+                .isEqualTo(OperatorCoordinator.NO_CHECKPOINT);
+    }
+
     @Test
     public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception 
{
         final DefaultScheduler scheduler = createSchedulerAndDeployTasks();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 32ce7f8560e..5f6fedff2e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.BiConsumer;
 
 /** A simple testing implementation of the {@link OperatorCoordinator}. */
 public class TestingOperatorCoordinator implements OperatorCoordinator {
@@ -57,6 +58,8 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
 
     private final Map<Integer, SubtaskGateway> subtaskGateways;
 
+    private BiConsumer<Long, byte[]> resetToCheckpointConsumer;
+
     private boolean started;
     private boolean closed;
 
@@ -124,6 +127,9 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
 
     @Override
     public void resetToCheckpoint(long checkpointId, @Nullable byte[] 
checkpointData) {
+        if (resetToCheckpointConsumer != null) {
+            resetToCheckpointConsumer.accept(checkpointId, checkpointData);
+        }
         lastRestoredCheckpointId = checkpointId;
         lastRestoredCheckpointState = checkpointData == null ? 
NULL_RESTORE_VALUE : checkpointData;
     }
@@ -188,6 +194,10 @@ public class TestingOperatorCoordinator implements 
OperatorCoordinator {
         return !lastCheckpointComplete.isEmpty();
     }
 
+    public void setResetToCheckpointConsumer(BiConsumer<Long, byte[]> 
resetToCheckpointConsumer) {
+        this.resetToCheckpointConsumer = resetToCheckpointConsumer;
+    }
+
     // ------------------------------------------------------------------------
 
     public static final class SubtaskAndCheckpoint {

Reply via email to