This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 32f41f8  [FLINK-22502][checkpointing] Don't tolerate checkpoint 
retrieval failures on recovery
32f41f8 is described below

commit 32f41f861ef082cc1154937124f66ff0c03dc32c
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Wed May 5 18:59:14 2021 +0200

    [FLINK-22502][checkpointing] Don't tolerate checkpoint retrieval failures 
on recovery
    
    Ignoring such failures and running with an incomplete
    set of checkpoints can lead to consistency violation.
    
    Instead, transient failures should be mitigated by
    automatic job restart.
---
 .../DefaultCompletedCheckpointStore.java           |  65 +------
 .../DefaultCompletedCheckpointStoreTest.java       |  45 +----
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |  26 +--
 .../test/checkpointing/CheckpointStoreITCase.java  | 206 +++++++++++++++++++++
 4 files changed, 222 insertions(+), 120 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 0f419fd..3490b8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -140,71 +140,20 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
             return;
         }
 
-        // Try and read the state handles from storage. We try until we either 
successfully read
-        // all of them or when we reach a stable state, i.e. when we 
successfully read the same set
-        // of checkpoints in two tries. We do it like this to protect against 
transient outages
-        // of the checkpoint store (for example a DFS): if the DFS comes 
online midway through
-        // reading a set of checkpoints we would run the risk of reading only 
a partial set
-        // of checkpoints while we could in fact read the other checkpoints as 
well if we retried.
-        // Waiting until a stable state protects against this while also being 
resilient against
-        // checkpoints being actually unreadable.
-        //
-        // These considerations are also important in the scope of incremental 
checkpoints, where
-        // we use ref-counting for shared state handles and might accidentally 
delete shared state
-        // of checkpoints that we don't read due to transient storage outages.
-        final List<CompletedCheckpoint> lastTryRetrievedCheckpoints =
-                new ArrayList<>(numberOfInitialCheckpoints);
         final List<CompletedCheckpoint> retrievedCheckpoints =
                 new ArrayList<>(numberOfInitialCheckpoints);
-        Exception retrieveException = null;
-        do {
-            LOG.info("Trying to fetch {} checkpoints from storage.", 
numberOfInitialCheckpoints);
-
-            lastTryRetrievedCheckpoints.clear();
-            lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints);
-
-            retrievedCheckpoints.clear();
-
-            for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle :
-                    initialCheckpoints) {
-
-                CompletedCheckpoint completedCheckpoint;
-
-                try {
-                    completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
-                    if (completedCheckpoint != null) {
-                        retrievedCheckpoints.add(completedCheckpoint);
-                    }
-                } catch (Exception e) {
-                    LOG.warn(
-                            "Could not retrieve checkpoint, not adding to list 
of recovered checkpoints.",
-                            e);
-                    retrieveException = e;
-                }
-            }
+        LOG.info("Trying to fetch {} checkpoints from storage.", 
numberOfInitialCheckpoints);
 
-        } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints
-                && !CompletedCheckpoint.checkpointsMatch(
-                        lastTryRetrievedCheckpoints, retrievedCheckpoints));
+        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle :
+                initialCheckpoints) {
+            retrievedCheckpoints.add(
+                    
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
+        }
 
         // Clear local handles in order to prevent duplicates on recovery. The 
local handles should
-        // reflect
-        // the state handle store contents.
+        // reflect the state handle store contents.
         completedCheckpoints.clear();
         completedCheckpoints.addAll(retrievedCheckpoints);
-
-        if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) {
-            throw new FlinkException(
-                    "Could not read any of the "
-                            + numberOfInitialCheckpoints
-                            + " checkpoints from storage.",
-                    retrieveException);
-        } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) {
-            LOG.warn(
-                    "Could only fetch {} of {} checkpoints from storage.",
-                    completedCheckpoints.size(),
-                    numberOfInitialCheckpoints);
-        }
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 6086535..26c0a3c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
@@ -152,7 +153,7 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         assertThat(checkpointIds, contains(1L, 2L, 3L));
     }
 
-    /** We got an {@link IOException} when retrieving checkpoint 2. It should 
be skipped. */
+    /** We got an {@link IOException} when retrieving checkpoint 2. It should 
NOT be skipped. */
     @Test
     public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws 
Exception {
         final long corruptCkpId = 2L;
@@ -169,45 +170,15 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
-
-        final List<CompletedCheckpoint> recoveredCompletedCheckpoint =
-                completedCheckpointStore.getAllCheckpoints();
-        assertThat(recoveredCompletedCheckpoint.size(), is(2));
-        final List<Long> checkpointIds =
-                recoveredCompletedCheckpoint.stream()
-                        .map(CompletedCheckpoint::getCheckpointID)
-                        .collect(Collectors.toList());
-        // Checkpoint 2 should be skipped.
-        assertThat(checkpointIds, contains(1L, 3L));
-    }
-
-    /**
-     * {@link DefaultCompletedCheckpointStore#recover()} should throw 
exception when all the
-     * checkpoints retrieved failed while the checkpoint pointers are not 
empty.
-     */
-    @Test
-    public void testRecoverFailedWhenRetrieveCheckpointAllFailed() {
-        final int ckpNum = 3;
-        checkpointStorageHelper.setRetrieveStateFunction(
-                (state) -> {
-                    throw new IOException(
-                            "Failed to retrieve checkpoint " + 
state.getCheckpointID());
-                });
-
-        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
-                builder.setGetAllSupplier(() -> 
createStateHandles(ckpNum)).build();
-        final CompletedCheckpointStore completedCheckpointStore =
-                createCompletedCheckpointStore(stateHandleStore);
-
         try {
             completedCheckpointStore.recover();
-            fail("We should get an exception when retrieving state failed.");
-        } catch (Exception ex) {
-            final String errMsg =
-                    "Could not read any of the " + ckpNum + " checkpoints from 
storage.";
-            assertThat(ex, FlinkMatchers.containsMessage(errMsg));
+        } catch (Exception e) {
+            if (ExceptionUtils.findThrowable(e, 
IOException.class).isPresent()) {
+                return;
+            }
+            throw e;
         }
+        fail();
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index af8c091..b3ca09d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -39,7 +39,6 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -64,8 +63,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
             ZooKeeperCheckpointStoreUtil.INSTANCE;
 
     /**
-     * Tests that the completed checkpoint store can retrieve all checkpoints 
stored in ZooKeeper
-     * and ignores those which cannot be retrieved via their state handles.
+     * Tests that the completed checkpoint store can retrieve all checkpoints 
stored in ZooKeeper.
      *
      * <p>We have a timeout in case the ZooKeeper store get's into a 
deadlock/livelock situation.
      */
@@ -81,11 +79,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
         expectedCheckpointIds.add(1L);
         expectedCheckpointIds.add(2L);
 
-        final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle =
-                mock(RetrievableStateHandle.class);
-        when(failingRetrievableStateHandle.retrieveState())
-                .thenThrow(new IOException("Test exception"));
-
         final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 =
                 mock(RetrievableStateHandle.class);
         when(retrievableStateHandle1.retrieveState())
@@ -121,9 +114,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
                                         new 
TestCompletedCheckpointStorageLocation())));
 
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/foobar1"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing1"));
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, 
"/foobar2"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing2"));
 
         final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
         final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock =
@@ -206,10 +197,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
         // check that we did not discard any of the state handles
         verify(retrievableStateHandle1, never()).discardState();
         verify(retrievableStateHandle2, never()).discardState();
-
-        // Make sure that we also didn't discard any of the broken handles. 
Only when checkpoints
-        // are subsumed should they be discarded.
-        verify(failingRetrievableStateHandle, never()).discardState();
     }
 
     /**
@@ -230,11 +217,6 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
         expectedCheckpointIds.add(1L);
         expectedCheckpointIds.add(2L);
 
-        final RetrievableStateHandle<CompletedCheckpoint> 
failingRetrievableStateHandle =
-                mock(RetrievableStateHandle.class);
-        when(failingRetrievableStateHandle.retrieveState())
-                .thenThrow(new IOException("Test exception"));
-
         final RetrievableStateHandle<CompletedCheckpoint> 
retrievableStateHandle1 =
                 mock(RetrievableStateHandle.class);
         when(retrievableStateHandle1.retrieveState())
@@ -268,9 +250,7 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
                                         new 
TestCompletedCheckpointStorageLocation())));
 
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, 
"/foobar1"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing1"));
         checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, 
"/foobar2"));
-        checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, 
"/failing2"));
 
         final CuratorFramework client = mock(CuratorFramework.class, 
Mockito.RETURNS_DEEP_STUBS);
         final RetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelperMock =
@@ -353,9 +333,5 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
         // check that we did not discard any of the state handles
         verify(retrievableStateHandle1, never()).discardState();
         verify(retrievableStateHandle2, never()).discardState();
-
-        // Make sure that we also didn't discard any of the broken handles. 
Only when checkpoints
-        // are subsumed should they be discarded.
-        verify(failingRetrievableStateHandle, never()).discardState();
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
new file mode 100644
index 0000000..d1a3d24
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Test that failure on recovery leads to job restart if configured, so that 
transient recovery
+ * failures can are mitigated.
+ */
+public class CheckpointStoreITCase extends TestLogger {
+
+    @ClassRule
+    public static final MiniClusterWithClientResource CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(
+                                    new Configuration()
+                                            .set(
+                                                    
HighAvailabilityOptions.HA_MODE,
+                                                    
TestingHAFactory.class.getName()))
+                            .build());
+
+    @Before
+    public void init() {
+        FailingStore.reset();
+        FailingMapper.reset();
+    }
+
+    @Test
+    public void testRestartOnRecoveryFailure() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10);
+        env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + 
on recovery */, 0));
+        env.addSource(emitUntil(() -> FailingStore.recovered && 
FailingMapper.failedAndProcessed))
+                .map(new FailingMapper())
+                .addSink(new DiscardingSink<>());
+        env.execute();
+
+        checkState(FailingStore.recovered && FailingMapper.failedAndProcessed);
+    }
+
+    private static class FailingMapper implements MapFunction<Integer, 
Integer> {
+        private static volatile boolean failed = false;
+        private static volatile boolean failedAndProcessed = false;
+
+        public static void reset() {
+            failed = false;
+            failedAndProcessed = false;
+        }
+
+        @Override
+        public Integer map(Integer element) throws Exception {
+            if (!failed) {
+                failed = true;
+                throw new RuntimeException();
+            } else {
+                failedAndProcessed = true;
+                return element;
+            }
+        }
+    }
+
+    /** TestingHAFactory. */
+    public static class TestingHAFactory implements 
HighAvailabilityServicesFactory {
+
+        @Override
+        public HighAvailabilityServices createHAServices(
+                Configuration configuration, Executor executor) {
+            return new EmbeddedHaServices(Executors.directExecutor()) {
+
+                @Override
+                public CheckpointRecoveryFactory 
getCheckpointRecoveryFactory() {
+                    return new TestingCheckpointRecoveryFactory(
+                            new FailingStore(),
+                            new TestingCheckpointIDCounter(new 
CompletableFuture<>()));
+                }
+            };
+        }
+    }
+
+    private static class FailingStore implements CompletedCheckpointStore {
+        private static volatile boolean started = false;
+        private static volatile boolean failed = false;
+        private static volatile boolean recovered = false;
+
+        public static void reset() {
+            started = failed = recovered = false;
+        }
+
+        @Override
+        public void recover() throws Exception {
+            if (!started) {
+                started = true;
+            } else if (!failed) {
+                failed = true;
+                throw new RuntimeException();
+            } else if (!recovered) {
+                recovered = true;
+            }
+        }
+
+        @Override
+        public void addCheckpoint(
+                CompletedCheckpoint checkpoint,
+                CheckpointsCleaner checkpointsCleaner,
+                Runnable postCleanup) {}
+
+        @Override
+        public void shutdown(JobStatus jobStatus, CheckpointsCleaner 
checkpointsCleaner)
+                throws Exception {}
+
+        @Override
+        public List<CompletedCheckpoint> getAllCheckpoints() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public int getNumberOfRetainedCheckpoints() {
+            return 0;
+        }
+
+        @Override
+        public int getMaxNumberOfRetainedCheckpoints() {
+            return 1;
+        }
+
+        @Override
+        public boolean requiresExternalizedCheckpoints() {
+            return false;
+        }
+    }
+
+    private SourceFunction<Integer> emitUntil(SerializableSupplier<Boolean> 
until) {
+        return new SourceFunction<Integer>() {
+            private volatile boolean running = true;
+
+            @Override
+            public void run(SourceContext<Integer> ctx) {
+                while (running && !until.get()) {
+                    synchronized (ctx.getCheckpointLock()) {
+                        ctx.collect(0);
+                        try {
+                            Thread.sleep(100);
+                        } catch (InterruptedException e) {
+                            ExceptionUtils.rethrow(e);
+                        }
+                    }
+                }
+            }
+
+            @Override
+            public void cancel() {
+                running = false;
+            }
+        };
+    }
+}

Reply via email to