[ https://issues.apache.org/jira/browse/FLINK-2354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14948888#comment-14948888 ]
ASF GitHub Bot commented on FLINK-2354: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1153#discussion_r41531181 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java --- @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.utils.ZKPaths; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StateHandleProvider; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@link SubmittedJobGraph} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}. + * + * <p>Each job graph creates ZNode: + * <pre> + * +----O /flink/jobgraphs/<job-id> 1 [persistent] + * . + * . + * . + * +----O /flink/jobgraphs/<job-id> N [persistent] + * </pre> + * + * <p>The root path is watched to detect concurrent modifications in corner situations where + * multiple instances operate concurrently. The job manager acts as a {@link SubmittedJobGraphListener} + * to react to such situations. + */ +public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class); + + /** Lock to synchronize with the {@link SubmittedJobGraphListener}. */ + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** The set of IDs of all added job graphs. */ + private final Set<JobID> addedJobGraphs = new HashSet<>(); + + /** Completed checkpoints in ZooKeeper */ + private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper; + + /** + * Cache to monitor all children. This is used to detect races with other instances working + * on the same state. + */ + private final PathChildrenCache pathCache; + + /** The external listener to be notified on races. */ + private SubmittedJobGraphListener jobGraphListener; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + public ZooKeeperSubmittedJobGraphStore( + CuratorFramework client, + String currentJobsPath, + StateHandleProvider<SubmittedJobGraph> stateHandleProvider) throws Exception { + + checkNotNull(currentJobsPath, "Current jobs path"); + checkNotNull(stateHandleProvider, "State handle provider"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "Curator client"); + + // Ensure that the job graphs path exists + client.newNamespaceAwareEnsurePath(currentJobsPath) + .ensure(client.getZookeeperClient()); + + // All operations will have the path as root + client = client.usingNamespace(client.getNamespace() + currentJobsPath); + + this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(client, stateHandleProvider); + + this.pathCache = new PathChildrenCache(client, "/", false); + pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener()); + } + + @Override + public void start(SubmittedJobGraphListener jobGraphListener) throws Exception { + synchronized (cacheLock) { + if (!isRunning) { + this.jobGraphListener = jobGraphListener; + + pathCache.start(); + + isRunning = true; + } + } + } + + @Override + public void stop() throws Exception { + synchronized (cacheLock) { + if (isRunning) { + jobGraphListener = null; + + pathCache.close(); + + client.close(); + + isRunning = false; + } + } + } + + @Override + public List<SubmittedJobGraph> recoverJobGraphs() throws Exception { + synchronized (cacheLock) { + verifyIsRunning(); + + List<ZooKeeperStateHandleStore.ZooKeeperStateHandle<SubmittedJobGraph>> submitted; + + while (true) { + try { + submitted = jobGraphsInZooKeeper.getAll(); + break; + } + catch (ConcurrentModificationException e) { + LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying."); + } + } + + if (submitted.size() != 0) { + List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size()); + + for (ZooKeeperStateHandleStore.ZooKeeperStateHandle<SubmittedJobGraph> jobStateHandle : submitted) { + SubmittedJobGraph jobGraph = jobStateHandle + .getState(ClassLoader.getSystemClassLoader()); + + addedJobGraphs.add(jobGraph.getJobId()); + + jobGraphs.add(jobGraph); + } + + LOG.info("Recovered {} job graphs: {}.", jobGraphs.size(), jobGraphs); + return jobGraphs; + } + else { + LOG.info("No job graph to recover."); + return Collections.emptyList(); + } + } + } + + @Override + public Option<SubmittedJobGraph> recoverJobGraph(JobID jobId) throws Exception { + checkNotNull(jobId, "Job ID"); + String path = getPathForJob(jobId); + + synchronized (cacheLock) { + verifyIsRunning(); + + try { + StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path); + + SubmittedJobGraph jobGraph = jobStateHandle + .getState(ClassLoader.getSystemClassLoader()); + + addedJobGraphs.add(jobGraph.getJobId()); + + LOG.info("Recovered {}.", jobGraph); + + return Option.apply(jobGraph); + } + catch (KeeperException.NoNodeException ignored) { + } + + return Option.empty(); + } + } + + @Override + public void putJobGraph(SubmittedJobGraph jobGraph) throws Exception { + checkNotNull(jobGraph, "Job graph"); + String path = getPathForJob(jobGraph.getJobId()); + + boolean success = false; + + while (!success) { + synchronized (cacheLock) { + verifyIsRunning(); + + int currentVersion = jobGraphsInZooKeeper.exists(path); + + if (currentVersion == -1) { + try { + jobGraphsInZooKeeper.add(path, jobGraph); + LOG.info("Added {} to ZooKeeper.", jobGraph); + + success = true; + } + catch (KeeperException.NodeExistsException ignored) { + continue; --- End diff -- No need for `continue`. You can simply check `if (success)` before the `addedJobGraphs.add(jobGraph.getJobId())`. > Recover running jobs on JobManager failure > ------------------------------------------ > > Key: FLINK-2354 > URL: https://issues.apache.org/jira/browse/FLINK-2354 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Affects Versions: 0.10 > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > Fix For: 0.10 > > > tl;dr Persist JobGraphs in state backend and coordinate reference to state > handle via ZooKeeper. > Problem: When running multiple JobManagers in high availability mode, the > leading job manager looses all running jobs when it fails. After a new > leading job manager is elected, it is not possible to recover any previously > running jobs. > Solution: The leading job manager, which receives the job graph writes 1) the > job graph to a state backend, and 2) a reference to the respective state > handle to ZooKeeper. In general, job graphs can become large (multiple MBs, > because they include closures etc.). ZooKeeper is not designed for data of > this size. The level of indirection via the reference to the state backend > keeps the data in ZooKeeper small. > Proposed ZooKeeper layout: > /flink (default) > +- currentJobs > +- job id i > +- state handle reference of job graph i > The 'currentJobs' node needs to be persistent to allow recovery of jobs > between job managers. The currentJobs node needs to satisfy the following > invariant: There is a reference to a job graph with id i IFF the respective > job graph needs to be recovered by a newly elected job manager leader. > With this in place, jobs will be recovered from their initial state (as if > resubmitted). The next step is to backup the runtime state handles of > checkpoints in a similar manner. > --- > This work will be based on [~trohrm...@apache.org]'s implementation of > FLINK-2291. The leader election service notifies the job manager about > granted/revoked leadership. This notification happens via Akka and thus > serially *per* job manager, but results in eventually consistent state > between job managers. For some snapshots of time it is possible to have a new > leader granted leadership, before the old one has been revoked its leadership. > [~trohrm...@apache.org], can you confirm that leadership does not guarantee > mutually exclusive access to the shared 'currentJobs' state? > For example, the following can happen: > - JM 1 is leader, JM 2 is standby > - JOB i is running (and hence /flink/currentJobs/i exists) > - ZK notifies leader election service (LES) of JM 1 and JM 2 > - LES 2 immediately notifies JM 2 about granted leadership, but LES 1 > notification revoking leadership takes longer > - JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives > final JobStatusChange > - JM 2 resubmits the job /flink/currentJobs/i > - JM 1 removes /flink/currentJobs/i, because it is now finished > => inconsistent state (wrt the specified invariant above) > If it is indeed a problem, we can circumvent this with a Curator recipe for > [shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to > coordinate the access to currentJobs. The lock needs to be acquired on > leadership. > --- > Minimum required tests: > - Unit tests for job graph serialization and writing to state backend and > ZooKeeper with expected nodes > - Unit tests for job submission to job manager in leader/non-leader state > - Unit tests for leadership granting/revoking and job submission/restarting > interleavings > - Process failure integration tests with single and multiple running jobs -- This message was sent by Atlassian JIRA (v6.3.4#6332)