[ 
https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948810#comment-14948810
 ] 

ASF GitHub Bot commented on FLINK-2354:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1153#discussion_r41526216
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.curator.framework.api.BackgroundCallback;
    +import org.apache.curator.framework.api.CuratorEvent;
    +import org.apache.curator.framework.api.CuratorEventType;
    +import org.apache.curator.utils.ZKPaths;
    +import org.apache.flink.runtime.jobmanager.RecoveryMode;
    +import org.apache.flink.runtime.state.StateHandleProvider;
    +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
    +import 
org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.ZooKeeperStateHandle;
    +import org.apache.zookeeper.data.Stat;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayDeque;
    +import java.util.ArrayList;
    +import java.util.ConcurrentModificationException;
    +import java.util.List;
    +
    +import static com.google.common.base.Preconditions.checkArgument;
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +/**
    + * {@link CompletedCheckpointStore} for JobManagers running in {@link 
RecoveryMode#ZOOKEEPER}.
    + *
    + * <p>Checkpoints are added under a ZNode per job:
    + * <pre>
    + * +----O /flink/checkpoints/&lt;job-id&gt;  [persistent]
    + * .    |
    + * .    +----O /flink/checkpoints/&lt;job-id&gt;/1 [persistent_sequential]
    + * .    .                                  .
    + * .    .                                  .
    + * .    .                                  .
    + * .    +----O /flink/checkpoints/&lt;job-id&gt;/N [persistent_sequential]
    + * </pre>
    + *
    + * <p>During recovery, the latest checkpoint is read from ZooKeeper. If 
there is more than one,
    + * only the latest one is used and older ones are discarded (even if the 
maximum number
    + * of retained checkpoints is greater than one). The sequential; cversion 
determines, which
    + * checkpoint is the latest one.
    + *
    + * <p>If there is a network partition and multiple JobManagers run 
concurrent checkpoints for the
    + * same program, it is OK to take any valid successful checkpoint as long 
as the "history" of
    + * checkpoints is consistent. Currently, after recovery we start out with 
only a single
    + * checkpoint in oder to guarantee this.
    + */
    +public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointStore {
    +
    +   private static final Logger LOG = 
LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
    +
    +   /** Curator ZooKeeper client */
    +   private final CuratorFramework client;
    +
    +   /** Completed checkpoints in ZooKeeper */
    +   private final ZooKeeperStateHandleStore<CompletedCheckpoint> 
checkpointsInZooKeeper;
    +
    +   /** The maximum number of checkpoints to retain (at least 1). */
    +   private final int maxNumberOfCheckpointsToRetain;
    +
    +   /** User class loader for discarding {@link CompletedCheckpoint} 
instances. */
    +   private final ClassLoader userClassLoader;
    +
    +   /** Local completed checkpoints. */
    +   private final ArrayDeque<ZooKeeperStateHandle<CompletedCheckpoint>> 
checkpointStateHandles;
    +
    +   /**
    +    * Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
    +    *
    +    * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain (at
    +    *                                       least 1). Adding more 
checkpoints than this results
    +    *                                       in older checkpoints being 
discarded. On recovery,
    +    *                                       we will only start with a 
single checkpoint.
    +    * @param userClassLoader                The user class loader used to 
discard checkpoints
    +    * @param client                         The Curator ZooKeeper client
    +    * @param checkpointsPath                The ZooKeeper path for the 
checkpoints (needs to
    +    *                                       start with a '/')
    +    * @param stateHandleProvider            The state handle provider for 
checkpoints
    +    * @throws Exception
    +    */
    +   public ZooKeeperCompletedCheckpointStore(
    +                   int maxNumberOfCheckpointsToRetain,
    +                   ClassLoader userClassLoader,
    +                   CuratorFramework client,
    +                   String checkpointsPath,
    +                   StateHandleProvider<CompletedCheckpoint> 
stateHandleProvider) throws Exception {
    +
    +           checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
    +
    +           this.maxNumberOfCheckpointsToRetain = 
maxNumberOfCheckpointsToRetain;
    +           this.userClassLoader = checkNotNull(userClassLoader, "User 
class loader");
    +
    +           checkNotNull(client, "Curator client");
    +           checkNotNull(checkpointsPath, "Checkpoints path");
    +           checkNotNull(stateHandleProvider, "State handle provider");
    +
    +           // Ensure that the checkpoints path exists
    +           client.newNamespaceAwareEnsurePath(checkpointsPath)
    +                           .ensure(client.getZookeeperClient());
    +
    +           // All operations will have the path as root
    +           this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
    +
    +           this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(
    +                           this.client, stateHandleProvider);
    +
    +           this.checkpointStateHandles = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
    +
    +           LOG.info("Initialized in '{}'.", checkpointsPath);
    +   }
    +
    +   /**
    +    * Gets the latest checkpoint from ZooKeeper and removes all others.
    +    *
    +    * <p><strong>Important</strong>: Even if there are more than one 
checkpoint in ZooKeeper,
    +    * this will only recover the latest and discard the others. Otherwise, 
there is no guarantee
    +    * that the history of checkpoints is consistent.
    +    */
    +   @Override
    +   public void recover() throws Exception {
    +           LOG.info("Recovering checkpoints from ZooKeeper.");
    +
    +           // Get all there is first
    +           List<ZooKeeperStateHandle<CompletedCheckpoint>> 
initialCheckpoints;
    +           while (true) {
    +                   try {
    +                           initialCheckpoints = 
checkpointsInZooKeeper.getAllSortedByName();
    +                           break;
    +                   }
    +                   catch (ConcurrentModificationException e) {
    +                           LOG.warn("Concurrent modification while reading 
from ZooKeeper. Retrying.");
    +                   }
    +           }
    +
    +           int numberOfInitialCheckpoints = initialCheckpoints.size();
    +
    +           LOG.info("Found {} checkpoints in ZooKeeper.", 
numberOfInitialCheckpoints);
    +
    +           if (numberOfInitialCheckpoints > 0) {
    +                   // Take the last one. This is the latest checkpoints, 
because path names are strictly
    +                   // increasing (checkpoint ID).
    +                   ZooKeeperStateHandle<CompletedCheckpoint> latest = 
initialCheckpoints
    +                                   .get(numberOfInitialCheckpoints - 1);
    +
    +                   CompletedCheckpoint latestCheckpoint = 
latest.getState(userClassLoader);
    +
    +                   checkpointStateHandles.add(latest);
    +
    +                   LOG.info("Initialized with {}. Removing all older 
checkpoints.", latestCheckpoint);
    +
    +                   for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
    +                           try {
    +                                   
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
    +                           }
    +                           catch (Exception e) {
    +                                   LOG.error("Failed to discard 
checkpoint", e);
    +                           }
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public int getNextCheckpointID() throws Exception {
    +           Stat stat = client.checkExists().forPath("/");
    +
    +           if (stat == null) {
    +                   throw new IllegalStateException("Checkpoint root does 
not exist");
    +           }
    +           else {
    +                   return stat.getCversion() + 1;
    +           }
    +   }
    +
    +   /**
    +    * Synchronously writes the new checkpoint to ZooKeeper and 
asynchronously removes older ones.
    +    *
    +    * @param checkpoint Completed checkpoint to add.
    +    */
    +   @Override
    +   public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
    +           checkNotNull(checkpoint, "Checkpoint");
    +
    +           // First add the new one. If it fails, we don't want to loose 
existing data.
    +           String path = String.format("/%s", 
checkpoint.getCheckpointID());
    +
    +           final ZooKeeperStateHandle<CompletedCheckpoint> stateHandle =
    +                           checkpointsInZooKeeper.add(path, checkpoint);
    +
    +           checkpointStateHandles.addLast(stateHandle);
    +
    +           // Everything worked, let's remove a previous checkpoint if 
necessary.
    +           if (checkpointStateHandles.size() > 
maxNumberOfCheckpointsToRetain) {
    +                   
removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
    +           }
    +
    +           LOG.debug("Added {} to {}.", checkpoint, path);
    +           assert (checkpointStateHandles.getLast().equals(stateHandle));
    +   }
    +
    +   @Override
    +   public CompletedCheckpoint getLatestCheckpoint() throws Exception {
    +           if (checkpointStateHandles.isEmpty()) {
    +                   return null;
    +           }
    +           else {
    +                   return 
checkpointStateHandles.getLast().getState(userClassLoader);
    +           }
    +   }
    +
    +   @Override
    +   public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
    +           List<CompletedCheckpoint> checkpoints = new 
ArrayList<>(checkpointStateHandles.size());
    +
    +           for (ZooKeeperStateHandle<CompletedCheckpoint> stateHandle
    +                           : checkpointStateHandles) {
    +                   checkpoints.add(stateHandle.getState(userClassLoader));
    +           }
    +
    +           return checkpoints;
    +   }
    +
    +   @Override
    +   public int getNumberOfRetainedCheckpoints() {
    +           return checkpointStateHandles.size();
    +   }
    +
    +   @Override
    +   public void discardAllCheckpoints() throws Exception {
    +           for (ZooKeeperStateHandle<CompletedCheckpoint> checkpoint
    +                           : checkpointStateHandles) {
    +                   try {
    +                           
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
    +                   }
    +                   catch (Exception e) {
    +                           LOG.error("Failed to discard checkpoint.", e);
    +                   }
    +           }
    +
    +           checkpointStateHandles.clear();
    +
    +           String path = "/" + client.getNamespace();
    +
    +           LOG.info("Removing {} from ZooKeeper", path);
    +           
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
    +   }
    +
    +   /**
    +    * Removes the state handle from ZooKeeper, discards the checkpoints, 
and the state handle.
    +    */
    +   private void removeFromZooKeeperAndDiscardCheckpoint(
    +                   final ZooKeeperStateHandle<CompletedCheckpoint> 
stateHandle) throws Exception {
    +
    +           final BackgroundCallback callback = new BackgroundCallback() {
    +                   @Override
    +                   public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
    +                           try {
    +                                   if (event.getType() == 
CuratorEventType.DELETE) {
    +                                           if (event.getResultCode() == 0) 
{
    +                                                   // The checkpoint
    +                                                   CompletedCheckpoint 
checkpoint = stateHandle
    +                                                                   
.getState(userClassLoader);
    +
    +                                                   
checkpoint.discard(userClassLoader);
    +
    +                                                   // Discard the state 
handle
    +                                                   
stateHandle.discardState();
    +
    +                                                   // Discard the 
checkpoint
    +                                                   LOG.debug("Discarded " 
+ checkpoint);
    +                                           }
    +                                           else {
    +                                                   throw new 
IllegalStateException("Unexpected result code " +
    +                                                                   
event.getResultCode() + " in '" + event + "' callback.");
    +                                           }
    +                                   }
    +                                   else {
    +                                           throw new 
IllegalStateException("Unexpected event type " +
    +                                                           event.getType() 
+ " in '" + event + "' callback.");
    +                                   }
    +                           }
    +                           catch (Exception e) {
    +                                   LOG.error("Failed to discard 
checkpoint.", e);
    +                           }
    +                   }
    +           };
    +
    +           // Remove state handle from ZooKeeper first. If this fails, we 
can still recover, but if
    +           // we remove a state handle and fail to remove it from 
ZooKeeper, we end up in an
    +           // inconsistent state.
    +           checkpointsInZooKeeper.remove(stateHandle.getPathInZooKeeper(), 
callback);
    --- End diff --
    
    Why don't we simply do: `stateHandle.discard()` with `stateHandle` being 
`ZooKeeperStateHandle<CompletedCheckpoint>`. Internally `discard` will call 
`ZooKeeperStateHandleStore.removeAndDiscardState` or a variant where the the 
wrapped `StateHandle` is discarded in a callback.


> Recover running jobs on JobManager failure
> ------------------------------------------
>
>                 Key: FLINK-2354
>                 URL: https://issues.apache.org/jira/browse/FLINK-2354
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: 0.10
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 0.10
>
>
> tl;dr Persist JobGraphs in state backend and coordinate reference to state 
> handle via ZooKeeper.
> Problem: When running multiple JobManagers in high availability mode, the 
> leading job manager looses all running jobs when it fails. After a new 
> leading job manager is elected, it is not possible to recover any previously 
> running jobs.
> Solution: The leading job manager, which receives the job graph writes 1) the 
> job graph to a state backend, and 2) a reference to the respective state 
> handle to ZooKeeper. In general, job graphs can become large (multiple MBs, 
> because they include closures etc.). ZooKeeper is not designed for data of 
> this size. The level of indirection via the reference to the state backend 
> keeps the data in ZooKeeper small.
> Proposed ZooKeeper layout:
> /flink (default)
>   +- currentJobs
>        +- job id i
>             +- state handle reference of job graph i
> The 'currentJobs' node needs to be persistent to allow recovery of jobs 
> between job managers. The currentJobs node needs to satisfy the following 
> invariant: There is a reference to a job graph with id i IFF the respective 
> job graph needs to be recovered by a newly elected job manager leader.
> With this in place, jobs will be recovered from their initial state (as if 
> resubmitted). The next step is to backup the runtime state handles of 
> checkpoints in a similar manner.
> ---
> This work will be based on [~trohrm...@apache.org]'s implementation of 
> FLINK-2291. The leader election service notifies the job manager about 
> granted/revoked leadership. This notification happens via Akka and thus 
> serially *per* job manager, but results in eventually consistent state 
> between job managers. For some snapshots of time it is possible to have a new 
> leader granted leadership, before the old one has been revoked its leadership.
> [~trohrm...@apache.org], can you confirm that leadership does not guarantee 
> mutually exclusive access to the shared 'currentJobs' state?
> For example, the following can happen:
> - JM 1 is leader, JM 2 is standby
> - JOB i is running (and hence /flink/currentJobs/i exists)
> - ZK notifies leader election service (LES) of JM 1 and JM 2
> - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
> notification revoking leadership takes longer
> - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
> final JobStatusChange
> - JM 2 resubmits the job /flink/currentJobs/i
> - JM 1 removes /flink/currentJobs/i, because it is now finished
> => inconsistent state (wrt the specified invariant above)
> If it is indeed a problem, we can circumvent this with a Curator recipe for 
> [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
> coordinate the access to currentJobs. The lock needs to be acquired on 
> leadership.
> ---
> Minimum required tests:
> - Unit tests for job graph serialization and writing to state backend and 
> ZooKeeper with expected nodes
> - Unit tests for job submission to job manager in leader/non-leader state
> - Unit tests for leadership granting/revoking and job submission/restarting 
> interleavings
> - Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to