[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948810#comment-14948810 ]
ASF GitHub Bot commented on FLINK-2354: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41526216 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java --- @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * <p>Checkpoints are added under a ZNode per job: + * <pre> + * +----O /flink/checkpoints/<job-id> [persistent] + * . | + * . +----O /flink/checkpoints/<job-id>/1 [persistent_sequential] + * . . . + * . . . + * . . . + * . +----O /flink/checkpoints/<job-id>/N [persistent_sequential] + * </pre> + * + * <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one, + * only the latest one is used and older ones are discarded (even if the maximum number + * of retained checkpoints is greater than one). The sequential; cversion determines, which + * checkpoint is the latest one. + * + * <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the + * same program, it is OK to take any valid successful checkpoint as long as the "history" of + * checkpoints is consistent. Currently, after recovery we start out with only a single + * checkpoint in oder to guarantee this. + */ +public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class); + + /** Curator ZooKeeper client */ + private final CuratorFramework client; + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper; + + /** The maximum number of checkpoints to retain (at least 1). */ + private final int maxNumberOfCheckpointsToRetain; + + /** User class loader for discarding {@link CompletedCheckpoint} instances. */ + private final ClassLoader userClassLoader; + + /** Local completed checkpoints. */ + private final ArrayDeque<ZooKeeperStateHandle<CompletedCheckpoint>> checkpointStateHandles; + + /** + * Creates a {@link ZooKeeperCompletedCheckpointStore} instance. + * + * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain (at + * least 1). Adding more checkpoints than this results + * in older checkpoints being discarded. On recovery, + * we will only start with a single checkpoint. + * @param userClassLoader The user class loader used to discard checkpoints + * @param client The Curator ZooKeeper client + * @param checkpointsPath The ZooKeeper path for the checkpoints (needs to + * start with a '/') + * @param stateHandleProvider The state handle provider for checkpoints + * @throws Exception + */ + public ZooKeeperCompletedCheckpointStore( + int maxNumberOfCheckpointsToRetain, + ClassLoader userClassLoader, + CuratorFramework client, + String checkpointsPath, + StateHandleProvider<CompletedCheckpoint> stateHandleProvider) throws Exception { + + checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint."); + + this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain; + this.userClassLoader = checkNotNull(userClassLoader, "User class loader"); + + checkNotNull(client, "Curator client"); + checkNotNull(checkpointsPath, "Checkpoints path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Ensure that the checkpoints path exists + client.newNamespaceAwareEnsurePath(checkpointsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + this.client = client.usingNamespace(client.getNamespace() + checkpointsPath); + + this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>( + this.client, stateHandleProvider); + + this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1); + + LOG.info("Initialized in '{}'.", checkpointsPath); + } + + /** + * Gets the latest checkpoint from ZooKeeper and removes all others. + * + * <p><strong>Important</strong>: Even if there are more than one checkpoint in ZooKeeper, + * this will only recover the latest and discard the others. Otherwise, there is no guarantee + * that the history of checkpoints is consistent. + */ + @Override + public void recover() throws Exception { + LOG.info("Recovering checkpoints from ZooKeeper."); + + // Get all there is first + List<ZooKeeperStateHandle<CompletedCheckpoint>> initialCheckpoints; + while (true) { + try { + initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName(); + break; + } + catch (ConcurrentModificationException e) { + LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); + } + } + + int numberOfInitialCheckpoints = initialCheckpoints.size(); + + LOG.info("Found {} checkpoints in ZooKeeper.", numberOfInitialCheckpoints); + + if (numberOfInitialCheckpoints > 0) { + // Take the last one. This is the latest checkpoints, because path names are strictly + // increasing (checkpoint ID). + ZooKeeperStateHandle<CompletedCheckpoint> latest = initialCheckpoints + .get(numberOfInitialCheckpoints - 1); + + CompletedCheckpoint latestCheckpoint = latest.getState(userClassLoader); + + checkpointStateHandles.add(latest); + + LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint); + + for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) { + try { + removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i)); + } + catch (Exception e) { + LOG.error("Failed to discard checkpoint", e); + } + } + } + } + + @Override + public int getNextCheckpointID() throws Exception { + Stat stat = client.checkExists().forPath("/"); + + if (stat == null) { + throw new IllegalStateException("Checkpoint root does not exist"); + } + else { + return stat.getCversion() + 1; + } + } + + /** + * Synchronously writes the new checkpoint to ZooKeeper and asynchronously removes older ones. + * + * @param checkpoint Completed checkpoint to add. + */ + @Override + public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { + checkNotNull(checkpoint, "Checkpoint"); + + // First add the new one. If it fails, we don't want to loose existing data. + String path = String.format("/%s", checkpoint.getCheckpointID()); + + final ZooKeeperStateHandle<CompletedCheckpoint> stateHandle = + checkpointsInZooKeeper.add(path, checkpoint); + + checkpointStateHandles.addLast(stateHandle); + + // Everything worked, let's remove a previous checkpoint if necessary. + if (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) { + removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst()); + } + + LOG.debug("Added {} to {}.", checkpoint, path); + assert (checkpointStateHandles.getLast().equals(stateHandle)); + } + + @Override + public CompletedCheckpoint getLatestCheckpoint() throws Exception { + if (checkpointStateHandles.isEmpty()) { + return null; + } + else { + return checkpointStateHandles.getLast().getState(userClassLoader); + } + } + + @Override + public List<CompletedCheckpoint> getAllCheckpoints() throws Exception { + List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size()); + + for (ZooKeeperStateHandle<CompletedCheckpoint> stateHandle + : checkpointStateHandles) { + checkpoints.add(stateHandle.getState(userClassLoader)); + } + + return checkpoints; + } + + @Override + public int getNumberOfRetainedCheckpoints() { + return checkpointStateHandles.size(); + } + + @Override + public void discardAllCheckpoints() throws Exception { + for (ZooKeeperStateHandle<CompletedCheckpoint> checkpoint + : checkpointStateHandles) { + try { + removeFromZooKeeperAndDiscardCheckpoint(checkpoint); + } + catch (Exception e) { + LOG.error("Failed to discard checkpoint.", e); + } + } + + checkpointStateHandles.clear(); + + String path = "/" + client.getNamespace(); + + LOG.info("Removing {} from ZooKeeper", path); + ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true); + } + + /** + * Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle. + */ + private void removeFromZooKeeperAndDiscardCheckpoint( + final ZooKeeperStateHandle<CompletedCheckpoint> stateHandle) throws Exception { + + final BackgroundCallback callback = new BackgroundCallback() { + @Override + public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + try { + if (event.getType() == CuratorEventType.DELETE) { + if (event.getResultCode() == 0) { + // The checkpoint + CompletedCheckpoint checkpoint = stateHandle + .getState(userClassLoader); + + checkpoint.discard(userClassLoader); + + // Discard the state handle + stateHandle.discardState(); + + // Discard the checkpoint + LOG.debug("Discarded " + checkpoint); + } + else { + throw new IllegalStateException("Unexpected result code " + + event.getResultCode() + " in '" + event + "' callback."); + } + } + else { + throw new IllegalStateException("Unexpected event type " + + event.getType() + " in '" + event + "' callback."); + } + } + catch (Exception e) { + LOG.error("Failed to discard checkpoint.", e); + } + } + }; + + // Remove state handle from ZooKeeper first. If this fails, we can still recover, but if + // we remove a state handle and fail to remove it from ZooKeeper, we end up in an + // inconsistent state. + checkpointsInZooKeeper.remove(stateHandle.getPathInZooKeeper(), callback); --- End diff -- Why don't we simply do: `stateHandle.discard()` with `stateHandle` being `ZooKeeperStateHandle<CompletedCheckpoint>`. Internally `discard` will call `ZooKeeperStateHandleStore.removeAndDiscardState` or a variant where the the wrapped `StateHandle` is discarded in a callback. > Recover running jobs on JobManager failure > ------------------------------------------ > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Affects Versions: 0.10 > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Fix For: 0.10 > > > tl;dr Persist JobGraphs in state backend and coordinate reference to state > handle via ZooKeeper. > Problem: When running multiple JobManagers in high availability mode, the > leading job manager looses all running jobs when it fails. After a new > leading job manager is elected, it is not possible to recover any previously > running jobs. > Solution: The leading job manager, which receives the job graph writes 1) the > job graph to a state backend, and 2) a reference to the respective state > handle to ZooKeeper. In general, job graphs can become large (multiple MBs, > because they include closures etc.). ZooKeeper is not designed for data of > this size. The level of indirection via the reference to the state backend > keeps the data in ZooKeeper small. > Proposed ZooKeeper layout: > /flink (default) > +- currentJobs > +- job id i > +- state handle reference of job graph i > The 'currentJobs' node needs to be persistent to allow recovery of jobs > between job managers. The currentJobs node needs to satisfy the following > invariant: There is a reference to a job graph with id i IFF the respective > job graph needs to be recovered by a newly elected job manager leader. > With this in place, jobs will be recovered from their initial state (as if > resubmitted). The next step is to backup the runtime state handles of > checkpoints in a similar manner. > --- > This work will be based on [~trohrm...@apache.org]'s implementation of > FLINK-2291. The leader election service notifies the job manager about > granted/revoked leadership. This notification happens via Akka and thus > serially *per* job manager, but results in eventually consistent state > between job managers. For some snapshots of time it is possible to have a new > leader granted leadership, before the old one has been revoked its leadership. > [~trohrm...@apache.org], can you confirm that leadership does not guarantee > mutually exclusive access to the shared 'currentJobs' state? > For example, the following can happen: > - JM 1 is leader, JM 2 is standby > - JOB i is running (and hence /flink/currentJobs/i exists) > - ZK notifies leader election service (LES) of JM 1 and JM 2 > - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 > notification revoking leadership takes longer > - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives > final JobStatusChange > - JM 2 resubmits the job /flink/currentJobs/i > - JM 1 removes /flink/currentJobs/i, because it is now finished > => inconsistent state (wrt the specified invariant above) > If it is indeed a problem, we can circumvent this with a Curator recipe for > [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to > coordinate the access to currentJobs. The lock needs to be acquired on > leadership. > --- > Minimum required tests: > - Unit tests for job graph serialization and writing to state backend and > ZooKeeper with expected nodes > - Unit tests for job submission to job manager in leader/non-leader state > - Unit tests for leadership granting/revoking and job submission/restarting > interleavings > - Process failure integration tests with single and multiple running jobs -- This message was sent by Atlassian JIRA (v6.3.4#6332)