This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b3e14928e81 [FLINK-31041][runtime] Fix multiple restoreState when GlobalFailure occurs in a short period. b3e14928e81 is described below commit b3e14928e815dd6dbdffbe3c5616733d4c7c8825 Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Mon Feb 20 15:39:06 2023 +0800 [FLINK-31041][runtime] Fix multiple restoreState when GlobalFailure occurs in a short period. --- .../flink/runtime/scheduler/DefaultScheduler.java | 4 ++++ .../OperatorCoordinatorSchedulerTest.java | 21 +++++++++++++++++++++ .../coordination/TestingOperatorCoordinator.java | 10 ++++++++++ 3 files changed, 35 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index ecd2a2467cb..7127c0f9986 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -377,6 +377,10 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio final Set<ExecutionVertexID> verticesToRestart = executionVertexVersioner.getUnmodifiedExecutionVertices(executionVertexVersions); + if (verticesToRestart.isEmpty()) { + return; + } + removeVerticesFromRestartPending(verticesToRestart); resetForNewExecutions(verticesToRestart); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index b786294ccc6..020a584cc61 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -89,6 +89,7 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; @@ -372,6 +373,26 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { .isEqualTo(OperatorCoordinator.NO_CHECKPOINT); } + @Test + public void testGlobalFailureTwiceWillNotResetToCheckpointTwice() throws Exception { + final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + AtomicInteger resetToCheckpointCounter = new AtomicInteger(0); + coordinator.setResetToCheckpointConsumer( + (ignore1, ignore2) -> resetToCheckpointCounter.incrementAndGet()); + + // fail global twice. + scheduler.handleGlobalFailure(new TestException()); + failGlobalAndRestart(scheduler, new TestException()); + + assertThat(resetToCheckpointCounter.get()).isEqualTo(1); + assertThat(coordinator.getLastRestoredCheckpointState()) + .as("coordinator should have null restored state") + .isEqualTo(TestingOperatorCoordinator.NULL_RESTORE_VALUE); + assertThat(coordinator.getLastRestoredCheckpointId()) + .isEqualTo(OperatorCoordinator.NO_CHECKPOINT); + } + @Test public void testGlobalFailoverDoesNotNotifyLocalRestore() throws Exception { final DefaultScheduler scheduler = createSchedulerAndDeployTasks(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index 32ce7f8560e..5f6fedff2e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -31,6 +31,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.BiConsumer; /** A simple testing implementation of the {@link OperatorCoordinator}. */ public class TestingOperatorCoordinator implements OperatorCoordinator { @@ -57,6 +58,8 @@ public class TestingOperatorCoordinator implements OperatorCoordinator { private final Map<Integer, SubtaskGateway> subtaskGateways; + private BiConsumer<Long, byte[]> resetToCheckpointConsumer; + private boolean started; private boolean closed; @@ -124,6 +127,9 @@ public class TestingOperatorCoordinator implements OperatorCoordinator { @Override public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) { + if (resetToCheckpointConsumer != null) { + resetToCheckpointConsumer.accept(checkpointId, checkpointData); + } lastRestoredCheckpointId = checkpointId; lastRestoredCheckpointState = checkpointData == null ? NULL_RESTORE_VALUE : checkpointData; } @@ -188,6 +194,10 @@ public class TestingOperatorCoordinator implements OperatorCoordinator { return !lastCheckpointComplete.isEmpty(); } + public void setResetToCheckpointConsumer(BiConsumer<Long, byte[]> resetToCheckpointConsumer) { + this.resetToCheckpointConsumer = resetToCheckpointConsumer; + } + // ------------------------------------------------------------------------ public static final class SubtaskAndCheckpoint {