Repository: flink Updated Branches: refs/heads/release-1.3.3-rc1 [created] 3c40656a7
[FLINK-7783] Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f69bdf20 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f69bdf20 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f69bdf20 Branch: refs/heads/release-1.3.3-rc1 Commit: f69bdf207b92ca47a5ce3e29f6ec7193ed17ec72 Parents: 100558b Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Sun Oct 22 11:40:43 2017 +0200 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Mon Mar 12 18:40:17 2018 +0800 ---------------------------------------------------------------------- .../runtime/checkpoint/CompletedCheckpoint.java | 24 +++++ .../ZooKeeperCompletedCheckpointStore.java | 105 ++++++++++++------- .../zookeeper/ZooKeeperStateHandleStore.java | 3 +- .../ZooKeeperCompletedCheckpointStoreTest.java | 35 +++++-- 4 files changed, 118 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 76d1580..01718e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -303,4 +303,28 @@ public class CompletedCheckpoint implements Serializable { public String toString() { return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CompletedCheckpoint that = (CompletedCheckpoint) o; + + if (checkpointID != that.checkpointID) { + return false; + } + return job.equals(that.job); + } + + @Override + public int hashCode() { + int result = job.hashCode(); + result = 31 * result + (int) (checkpointID ^ (checkpointID >>> 32)); + return result; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 88dd0d4..73598e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -70,16 +70,20 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); - /** Curator ZooKeeper client */ + /** Curator ZooKeeper client. */ private final CuratorFramework client; - /** Completed checkpoints in ZooKeeper */ + /** Completed checkpoints in ZooKeeper. */ private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper; /** The maximum number of checkpoints to retain (at least 1). */ private final int maxNumberOfCheckpointsToRetain; - /** Local completed checkpoints. */ + /** + * Local copy of the completed checkpoints in ZooKeeper. This is restored from ZooKeeper + * when recovering and is maintained in parallel to the state in ZooKeeper during normal + * operations. + */ private final ArrayDeque<CompletedCheckpoint> completedCheckpoints; /** @@ -122,7 +126,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor); this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); - + LOG.info("Initialized in '{}'.", checkpointsPath); } @@ -142,11 +146,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto public void recover() throws Exception { LOG.info("Recovering checkpoints from ZooKeeper."); - // Clear local handles in order to prevent duplicates on - // recovery. The local handles should reflect the state - // of ZooKeeper. - completedCheckpoints.clear(); - // Get all there is first List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints; while (true) { @@ -163,22 +162,59 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); - for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) { + // Try and read the state handles from storage. We try until we either successfully read + // all of them or when we reach a stable state, i.e. when we successfully read the same set + // of checkpoints in two tries. We do it like this to protect against transient outages + // of the checkpoint store (for example a DFS): if the DFS comes online midway through + // reading a set of checkpoints we would run the risk of reading only a partial set + // of checkpoints while we could in fact read the other checkpoints as well if we retried. + // Waiting until a stable state protects against this while also being resilient against + // checkpoints being actually unreadable. + // + // These considerations are also important in the scope of incremental checkpoints, where + // we use ref-counting for shared state handles and might accidentally delete shared state + // of checkpoints that we don't read due to transient storage outages. + List<CompletedCheckpoint> lastTryRetrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); + List<CompletedCheckpoint> retrievedCheckpoints = new ArrayList<>(numberOfInitialCheckpoints); + do { + LOG.info("Trying to fetch {} checkpoints from storage.", numberOfInitialCheckpoints); - CompletedCheckpoint completedCheckpoint = null; + lastTryRetrievedCheckpoints.clear(); + lastTryRetrievedCheckpoints.addAll(retrievedCheckpoints); - try { - completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); - if (completedCheckpoint != null) { - completedCheckpoints.add(completedCheckpoint); - } - } catch (Exception e) { - LOG.warn("Could not retrieve checkpoint. Removing it from the completed " + - "checkpoint store.", e); + retrievedCheckpoints.clear(); + + for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle : initialCheckpoints) { - // remove the checkpoint with broken state handle - removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0); + CompletedCheckpoint completedCheckpoint = null; + + try { + completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle); + if (completedCheckpoint != null) { + retrievedCheckpoints.add(completedCheckpoint); + } + } catch (Exception e) { + LOG.warn("Could not retrieve checkpoint, not adding to list of recovered checkpoints.", e); + } } + + } while (retrievedCheckpoints.size() != numberOfInitialCheckpoints && + !lastTryRetrievedCheckpoints.equals(retrievedCheckpoints)); + + // Clear local handles in order to prevent duplicates on + // recovery. The local handles should reflect the state + // of ZooKeeper. + completedCheckpoints.clear(); + completedCheckpoints.addAll(retrievedCheckpoints); + + if (completedCheckpoints.isEmpty() && numberOfInitialCheckpoints > 0) { + throw new FlinkException( + "Could not read any of the " + numberOfInitialCheckpoints + " from storage."); + } else if (completedCheckpoints.size() != numberOfInitialCheckpoints) { + LOG.warn( + "Could only fetch {} of {} checkpoints from storage.", + completedCheckpoints.size(), + numberOfInitialCheckpoints); } } @@ -190,7 +226,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto @Override public void addCheckpoint(final CompletedCheckpoint checkpoint) throws Exception { checkNotNull(checkpoint, "Checkpoint"); - + final String path = checkpointIdToPath(checkpoint.getCheckpointID()); // Now add the new one. If it fails, we don't want to loose existing data. @@ -268,10 +304,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto // ------------------------------------------------------------------------ + /** + * Removes a subsumed checkpoint from ZooKeeper and drops the state. + */ private void removeSubsumed( final CompletedCheckpoint completedCheckpoint) throws Exception { - if(completedCheckpoint == null) { + if (completedCheckpoint == null) { return; } @@ -294,11 +333,14 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto action); } + /** + * Removes a checkpoint from ZooKeeper because of Job shutdown and drops the state. + */ private void removeShutdown( final CompletedCheckpoint completedCheckpoint, final JobStatus jobStatus) throws Exception { - if(completedCheckpoint == null) { + if (completedCheckpoint == null) { return; } @@ -318,21 +360,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto removeAction); } - private void removeBrokenStateHandle( - final String pathInZooKeeper, - final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception { - checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() { - @Override - public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException { - try { - retrievableStateHandle.discardState(); - } catch (Exception e) { - throw new FlinkException("Could not discard state handle.", e); - } - } - }); - } - /** * Convert a checkpoint id into a ZooKeeper path. * http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java index a548f1d..dc3f7d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java @@ -326,7 +326,8 @@ public class ZooKeeperStateHandleStore<T extends Serializable> { /** * Gets all available state handles from ZooKeeper sorted by name (ascending) and locks the - * respective state nodes. + * respective state nodes. The result tuples contain the retrieved state and the path to the + * node in ZooKeeper. * * <p>If there is a concurrent modification, the operation is retried until it succeeds. * http://git-wip-us.apache.org/repos/asf/flink/blob/f69bdf20/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 3171f1f..bee245c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -18,10 +18,11 @@ package org.apache.flink.runtime.checkpoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.RetrievableStateHandle; -import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; import org.apache.flink.util.TestLogger; @@ -44,6 +45,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.concurrent.Executor; @@ -57,7 +59,6 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.doAnswer; @@ -85,10 +86,25 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { public void testCheckpointRecovery() throws Exception { final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointsInZooKeeper = new ArrayList<>(4); - final CompletedCheckpoint completedCheckpoint1 = mock(CompletedCheckpoint.class); - when(completedCheckpoint1.getCheckpointID()).thenReturn(1L); - final CompletedCheckpoint completedCheckpoint2 = mock(CompletedCheckpoint.class); - when(completedCheckpoint2.getCheckpointID()).thenReturn(2L); + final CompletedCheckpoint completedCheckpoint1 = new CompletedCheckpoint( + new JobID(), + 1L, + 1L, + 1L, + new HashMap<OperatorID, OperatorState>(), + null, + CheckpointProperties.forStandardCheckpoint(), + null, null); + + final CompletedCheckpoint completedCheckpoint2 = new CompletedCheckpoint( + new JobID(), + 2L, + 2L, + 2L, + new HashMap<OperatorID, OperatorState>(), + null, + CheckpointProperties.forStandardCheckpoint(), + null, null); final Collection<Long> expectedCheckpointIds = new HashSet<>(2); expectedCheckpointIds.add(1L); @@ -180,12 +196,13 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { assertEquals(expectedCheckpointIds, actualCheckpointIds); - // check that we did not discard any of the state handles which were retrieved + // check that we did not discard any of the state handles verify(retrievableStateHandle1, never()).discardState(); verify(retrievableStateHandle2, never()).discardState(); - // check that we have discarded the state handles which could not be retrieved - verify(failingRetrievableStateHandle, times(2)).discardState(); + // Make sure that we also didn't discard any of the broken handles. Only when checkpoints + // are subsumed should they be discarded. + verify(failingRetrievableStateHandle, never()).discardState(); } /**