[FLINK-4375] [distributed coordination] Implement new JobManager creation, 
initialization, and basic RPC methods


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34fef475
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34fef475
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34fef475

Branch: refs/heads/flip-6
Commit: 34fef4752e94b3d0c7afe7a9525799bb651a07b4
Parents: c8dc074
Author: Kurt Young <ykt...@gmail.com>
Authored: Wed Oct 12 23:25:16 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Oct 13 16:25:49 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   |  20 +
 .../java/org/apache/flink/util/StringUtils.java |  14 +
 .../apache/flink/runtime/blob/BlobServer.java   |  65 +-
 .../apache/flink/runtime/blob/BlobStore.java    |   3 +-
 .../apache/flink/runtime/blob/BlobUtils.java    |   8 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  53 +-
 .../flink/runtime/blob/VoidBlobStore.java       |   2 +-
 .../CheckpointCoordinatorGateway.java           |  26 +-
 .../HighAvailabilityServices.java               |  13 +
 .../runtime/highavailability/NonHaServices.java |  13 +
 .../highavailability/RunningJobsRegistry.java   |  66 ++
 .../highavailability/ZookeeperHaServices.java   | 115 ++-
 .../highavailability/nonha/NonHaRegistry.java   |  62 ++
 .../runtime/jobmanager/OnCompletionActions.java |   3 +-
 .../runtime/jobmaster/JobManagerRunner.java     | 270 ++++--
 .../runtime/jobmaster/JobManagerServices.java   |  93 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 902 +++++++------------
 .../runtime/jobmaster/JobMasterGateway.java     | 114 +--
 .../jobmaster/MiniClusterJobDispatcher.java     |  61 +-
 .../message/DisposeSavepointResponse.java       |  49 -
 .../message/TriggerSavepointResponse.java       |  74 --
 .../apache/flink/runtime/rpc/RpcService.java    |   4 +-
 .../taskexecutor/JobManagerConnection.java      |  25 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 107 ++-
 .../taskexecutor/rpc/RpcInputSplitProvider.java |   8 +-
 .../rpc/RpcPartitionStateChecker.java           |   8 +-
 .../RpcResultPartitionConsumableNotifier.java   |   7 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   |   6 +
 .../TestingHighAvailabilityServices.java        |  14 +
 .../jobmaster/JobManagerRunnerMockTest.java     |  58 +-
 .../runtime/jobmaster/JobManagerRunnerTest.java |  24 +
 31 files changed, 1278 insertions(+), 1009 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 7227006..0f6f24f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -99,6 +99,26 @@ public final class ExceptionUtils {
        }
 
        /**
+        * Throws the given {@code Throwable} in scenarios where the signatures 
do allow to
+        * throw a Exception. Errors and Exceptions are thrown directly, other 
"exotic"
+        * subclasses of Throwable are wrapped in an Exception.
+        *
+        * @param t The throwable to be thrown.
+        * @param parentMessage The message for the parent Exception, if one is 
needed.
+        */
+       public static void rethrowException(Throwable t, String parentMessage) 
throws Exception {
+               if (t instanceof Error) {
+                       throw (Error) t;
+               }
+               else if (t instanceof Exception) {
+                       throw (Exception) t;
+               }
+               else {
+                       throw new Exception(parentMessage, t);
+               }
+       }
+
+       /**
         * Tries to throw the given {@code Throwable} in scenarios where the 
signatures allows only IOExceptions
         * (and RuntimeException and Error). Throws this exception directly, if 
it is an IOException,
         * a RuntimeException, or an Error. Otherwise does nothing.

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 10b6304..3c32d77 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -335,4 +335,18 @@ public final class StringUtils {
                        return null;
                }
        }
+
+       public static boolean isNullOrWhitespaceOnly(String str) {
+               if (str == null || str.length() == 0) {
+                       return true;
+               }
+
+               final int len = str.length();
+               for (int i = 0; i < len; i++) {
+                       if (!Character.isWhitespace(str.charAt(i))) {
+                               return false;
+                       }
+               }
+               return true;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index ff54b67..5395d34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -22,7 +22,11 @@ import org.apache.commons.io.FileUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.NetUtils;
 import org.slf4j.Logger;
@@ -43,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
  * This class implements the BLOB server. The BLOB server is responsible for 
listening for incoming requests and
@@ -88,23 +93,22 @@ public class BlobServer extends Thread implements 
BlobService {
         *         thrown if the BLOB server cannot bind to a free network port
         */
        public BlobServer(Configuration config) throws IOException {
-               checkNotNull(config, "Configuration");
+               this(config, createBlobStoreFromConfig(config));
+       }
 
-               HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
+       public BlobServer(Configuration config, HighAvailabilityServices 
haServices) throws IOException {
+               this(config, haServices.createBlobStore());
+       }
+
+       private BlobServer(Configuration config, BlobStore blobStore) throws 
IOException {
+               checkNotNull(config);
+               this.blobStore = checkNotNull(blobStore);
 
                // configure and create the storage directory
                String storageDirectory = 
config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null);
                this.storageDir = 
BlobUtils.initStorageDirectory(storageDirectory);
                LOG.info("Created BLOB server storage directory {}", 
storageDir);
 
-               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-                       this.blobStore = new VoidBlobStore();
-               } else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
-                       this.blobStore = new FileSystemBlobStore(config);
-               } else {
-                       throw new IllegalConfigurationException("Unexpected 
high availability mode '" + highAvailabilityMode + ".");
-               }
-
                // configure the maximum number of concurrent connections
                final int maxConnections = config.getInteger(
                                ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, 
ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT);
@@ -125,13 +129,7 @@ public class BlobServer extends Thread implements 
BlobService {
                        backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG;
                }
 
-               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
-                       // Add shutdown hook to delete storage directory
-                       this.shutdownHook = BlobUtils.addShutdownHook(this, 
LOG);
-               }
-               else {
-                       this.shutdownHook = null;
-               }
+               this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
 
                //  ----------------------- start the server -------------------
 
@@ -426,4 +424,37 @@ public class BlobServer extends Thread implements 
BlobService {
                }
        }
 
+       private static BlobStore createBlobStoreFromConfig(Configuration 
config) throws IOException {
+               HighAvailabilityMode highAvailabilityMode = 
HighAvailabilityMode.fromConfig(config);
+
+               if (highAvailabilityMode == HighAvailabilityMode.NONE) {
+               return new VoidBlobStore();
+               } else if (highAvailabilityMode == 
HighAvailabilityMode.ZOOKEEPER) {
+                       final String storagePath = 
config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+                       if (isNullOrWhitespaceOnly(storagePath)) {
+                               throw new 
IllegalConfigurationException("Configuration is missing the mandatory 
parameter: " +
+                                               
HighAvailabilityOptions.HA_STORAGE_PATH);
+                       }
+
+                       final Path path;
+                       try {
+                               path = new Path(storagePath);
+                       } catch (Exception e) {
+                               throw new IOException("Invalid path for highly 
available storage (" +
+                                               
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+                       }
+
+                       final FileSystem fileSystem;
+                       try {
+                               fileSystem = path.getFileSystem();
+                       } catch (Exception e) {
+                               throw new IOException("Could not create 
FileSystem for highly available storage (" +
+                                               
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+                       }
+
+                       return new FileSystemBlobStore(fileSystem, storagePath);
+               } else {
+                       throw new IllegalConfigurationException("Unexpected 
high availability mode '" + highAvailabilityMode + ".");
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
index 1e72d91..7050338 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobStore.java
@@ -25,7 +25,7 @@ import java.io.File;
 /**
  * A blob store.
  */
-interface BlobStore {
+public interface BlobStore {
 
        /**
         * Copies the local file to the blob store.
@@ -93,5 +93,4 @@ interface BlobStore {
         * Cleans up the store and deletes all blobs.
         */
        void cleanUp();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index e74fa6f..136df09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 
 import java.io.EOFException;
@@ -73,7 +74,7 @@ public class BlobUtils {
         */
        static File initStorageDirectory(String storageDirectory) {
                File baseDir;
-               if (storageDirectory == null || 
storageDirectory.trim().isEmpty()) {
+               if (StringUtils.isNullOrWhitespaceOnly(storageDirectory)) {
                        baseDir = new 
File(System.getProperty("java.io.tmpdir"));
                }
                else {
@@ -81,10 +82,9 @@ public class BlobUtils {
                }
 
                File storageDir;
-               final int MAX_ATTEMPTS = 10;
-               int attempt;
 
-               for(attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
+               final int MAX_ATTEMPTS = 10;
+               for(int attempt = 0; attempt < MAX_ATTEMPTS; attempt++) {
                        storageDir = new File(baseDir, String.format(
                                        "blobStore-%s", 
UUID.randomUUID().toString()));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index deba738..2c05002 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -20,12 +20,7 @@ package org.apache.flink.runtime.blob;
 
 import com.google.common.io.Files;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.IOUtils;
@@ -38,7 +33,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -47,25 +41,24 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>This is used in addition to the local blob storage for high availability.
  */
-class FileSystemBlobStore implements BlobStore {
+public class FileSystemBlobStore implements BlobStore {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBlobStore.class);
 
+       /** The file system in which blobs are stored */
+       private final FileSystem fileSystem;
+       
        /** The base path of the blob store */
        private final String basePath;
 
-       FileSystemBlobStore(Configuration config) throws IOException {
-               String storagePath = 
config.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
-
-               if (storagePath == null || StringUtils.isBlank(storagePath)) {
-                       throw new IllegalConfigurationException("Missing 
high-availability storage path for metadata." +
-                                       " Specify via configuration key '" + 
HighAvailabilityOptions.HA_STORAGE_PATH + "'.");
-               }
+       public FileSystemBlobStore(FileSystem fileSystem, String storagePath) 
throws IOException {
+               this.fileSystem = checkNotNull(fileSystem);
+               this.basePath = checkNotNull(storagePath) + "/blob";
 
-               this.basePath = storagePath + "/blob";
+               LOG.info("Creating highly available BLOB storage directory at 
{}", basePath);
 
-               FileSystem.get(new Path(basePath).toUri()).mkdirs(new 
Path(basePath));
-               LOG.info("Created blob directory {}.", basePath);
+               fileSystem.mkdirs(new Path(basePath));
+               LOG.debug("Created highly available BLOB storage directory at 
{}", basePath);
        }
 
        // - Put 
------------------------------------------------------------------
@@ -81,9 +74,7 @@ class FileSystemBlobStore implements BlobStore {
        }
 
        private void put(File fromFile, String toBlobPath) throws Exception {
-               try (OutputStream os = FileSystem.get(new URI(toBlobPath))
-                               .create(new Path(toBlobPath), true)) {
-
+               try (OutputStream os = fileSystem.create(new Path(toBlobPath), 
true)) {
                        LOG.debug("Copying from {} to {}.", fromFile, 
toBlobPath);
                        Files.copy(fromFile, os);
                }
@@ -106,16 +97,15 @@ class FileSystemBlobStore implements BlobStore {
                checkNotNull(toFile, "File");
 
                if (!toFile.exists() && !toFile.createNewFile()) {
-                       throw new IllegalStateException("Failed to create 
target file to copy to");
+                       throw new IOException("Failed to create target file to 
copy to");
                }
 
-               final URI fromUri = new URI(fromBlobPath);
                final Path fromPath = new Path(fromBlobPath);
 
-               if (FileSystem.get(fromUri).exists(fromPath)) {
-                       try (InputStream is = 
FileSystem.get(fromUri).open(fromPath)) {
-                               FileOutputStream fos = new 
FileOutputStream(toFile);
-
+               if (fileSystem.exists(fromPath)) {
+                       try (InputStream is = fileSystem.open(fromPath);
+                               FileOutputStream fos = new 
FileOutputStream(toFile))
+                       {
                                LOG.debug("Copying from {} to {}.", 
fromBlobPath, toFile);
                                IOUtils.copyBytes(is, fos); // closes the 
streams
                        }
@@ -145,17 +135,16 @@ class FileSystemBlobStore implements BlobStore {
        private void delete(String blobPath) {
                try {
                        LOG.debug("Deleting {}.", blobPath);
-
-                       FileSystem fs = FileSystem.get(new URI(blobPath));
+                       
                        Path path = new Path(blobPath);
 
-                       fs.delete(path, true);
+                       fileSystem.delete(path, true);
 
                        // send a call to delete the directory containing the 
file. This will
                        // fail (and be ignored) when some files still exist.
                        try {
-                               fs.delete(path.getParent(), false);
-                               fs.delete(new Path(basePath), false);
+                               fileSystem.delete(path.getParent(), false);
+                               fileSystem.delete(new Path(basePath), false);
                        } catch (IOException ignored) {}
                }
                catch (Exception e) {
@@ -168,7 +157,7 @@ class FileSystemBlobStore implements BlobStore {
                try {
                        LOG.debug("Cleaning up {}.", basePath);
 
-                       FileSystem.get(new URI(basePath)).delete(new 
Path(basePath), true);
+                       fileSystem.delete(new Path(basePath), true);
                }
                catch (Exception e) {
                        LOG.error("Failed to clean up recovery directory.");

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
index 1b71add..ece2ac1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/VoidBlobStore.java
@@ -25,7 +25,7 @@ import java.io.File;
 /**
  * A blob store doing nothing.
  */
-class VoidBlobStore implements BlobStore {
+public class VoidBlobStore implements BlobStore {
 
        @Override
        public void put(File localFile, BlobKey blobKey) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index e448ebc..196ef5c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -23,21 +23,23 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 
+import java.util.UUID;
+
 public interface CheckpointCoordinatorGateway extends RpcGateway {
 
        void acknowledgeCheckpoint(
-               JobID jobID,
-               ExecutionAttemptID executionAttemptID,
-               long checkpointID,
-               CheckpointStateHandles checkpointStateHandles,
-               long synchronousDurationMillis,
-               long asynchronousDurationMillis,
-               long bytesBufferedInAlignment,
-               long alignmentDurationNanos);
+                       final JobID jobID,
+                       final ExecutionAttemptID executionAttemptID,
+                       final long checkpointID,
+                       final CheckpointStateHandles checkpointStateHandles,
+                       final long synchronousDurationMillis,
+                       final long asynchronousDurationMillis,
+                       final long bytesBufferedInAlignment,
+                       final long alignmentDurationNanos);
 
        void declineCheckpoint(
-               JobID jobID,
-               ExecutionAttemptID executionAttemptID,
-               long checkpointID,
-               long checkpointTimestamp);
+                       final JobID jobID,
+                       final ExecutionAttemptID executionAttemptID,
+                       final long checkpointID,
+                       final long checkpointTimestamp);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index a26886a..5d78ffc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,11 +19,14 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
+import java.io.IOException;
+
 /**
  * This class gives access to all services needed for
  *
@@ -72,4 +75,14 @@ public interface HighAvailabilityServices {
         * Gets the submitted job graph store for the job manager
         */
        SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
+
+       /**
+        * Gets the registry that holds information about whether jobs are 
currently running.
+        */
+       RunningJobsRegistry getRunningJobsRegistry() throws Exception;
+
+       /**
+        * Creates the BLOB store in which BLOBs are stored in a 
highly-available fashion.
+        */
+       BlobStore createBlobStore() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 2c6295c..d7fd2bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,8 +19,11 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.nonha.NonHaRegistry;
 import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -102,4 +105,14 @@ public class NonHaServices implements 
HighAvailabilityServices {
        public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
                return new StandaloneSubmittedJobGraphStore();
        }
+
+       @Override
+       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+               return new NonHaRegistry();
+       }
+
+       @Override
+       public BlobStore createBlobStore() {
+               return new VoidBlobStore();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
new file mode 100644
index 0000000..e7c131c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/RunningJobsRegistry.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.flink.api.common.JobID;
+
+import java.io.IOException;
+
+/**
+ * This registry tracks if a certain job is running.
+ * 
+ * <p>This registry is used in highly-available setups with multiple master 
nodes,
+ * to determine whether a new leader should attempt to recover a certain job 
(because the 
+ * job is still running), or whether the job has already finished successfully 
(in case of a
+ * finite job) and the leader has only been granted leadership because the 
previous leader
+ * quit cleanly after the job was finished.
+ */
+public interface RunningJobsRegistry {
+
+       /**
+        * Marks a job as running.
+        * 
+        * @param jobID The id of the job.
+        *
+        * @throws IOException Thrown when the communication with the 
highly-available storage or registry
+        *                     failed and could not be retried.
+        */
+       void setJobRunning(JobID jobID) throws IOException;
+
+       /**
+        * Marks a job as running.
+        *
+        * @param jobID The id of the job.
+        * 
+        * @throws IOException Thrown when the communication with the 
highly-available storage or registry
+        *                     failed and could not be retried.
+        */
+       void setJobFinished(JobID jobID) throws IOException;
+
+       /**
+        * Checks whether a job is running.
+        *
+        * @param jobID The id of the job to check.
+        * @return True if the job is still running, false otherwise.
+        * 
+        * @throws IOException Thrown when the communication with the 
highly-available storage or registry
+        *                     failed and could not be retried.
+        */
+       boolean isJobRunning(JobID jobID) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index d26b668..3a7736b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -19,8 +19,15 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.curator.framework.CuratorFramework;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.FileSystemBlobStore;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
@@ -28,12 +35,56 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
+
 /**
- * An implementation of the {@link HighAvailabilityServices} with zookeeper.
+ * An implementation of the {@link HighAvailabilityServices} using Apache 
ZooKeeper.
+ * The services store data in ZooKeeper's nodes as illustrated by teh 
following tree structure:
+ * 
+ * <pre>
+ * /flink
+ *      +/cluster_id_1/resource_manager_lock
+ *      |            |
+ *      |            +/job-id-1/job_manager_lock
+ *      |            |         /checkpoints/latest
+ *      |            |                     /latest-1
+ *      |            |                     /latest-2
+ *      |            |
+ *      |            +/job-id-2/job_manager_lock
+ *      |      
+ *      +/cluster_id_2/resource_manager_lock
+ *                   |
+ *                   +/job-id-1/job_manager_lock
+ *                            |/checkpoints/latest
+ *                            |            /latest-1
+ *                            |/persisted_job_graph
+ * </pre>
+ * 
+ * <p>The root path "/flink" is configurable via the option {@link 
HighAvailabilityOptions#HA_ZOOKEEPER_ROOT}.
+ * This makes sure Flink stores its data under specific subtrees in ZooKeeper, 
for example to
+ * accommodate specific permission.
+ * 
+ * <p>The "cluster_id" part identifies the data stored for a specific Flink 
"cluster". 
+ * This "cluster" can be either a standalone or containerized Flink cluster, 
or it can be job
+ * on a framework like YARN or Mesos (in a "per-job-cluster" mode).
+ * 
+ * <p>In case of a "per-job-cluster" on YARN or Mesos, the cluster-id is 
generated and configured
+ * automatically by the client or dispatcher that submits the Job to YARN or 
Mesos.
+ * 
+ * <p>In the case of a standalone cluster, that cluster-id needs to be 
configured via
+ * {@link HighAvailabilityOptions#HA_CLUSTER_ID}. All nodes with the same 
cluster id will join the same
+ * cluster and participate in the execution of the same set of jobs.
  */
 public class ZookeeperHaServices implements HighAvailabilityServices {
 
-       private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = 
"/resource-manager";
+       private static final String RESOURCE_MANAGER_LEADER_PATH = 
"/resource_manager_lock";
+
+       private static final String JOB_MANAGER_LEADER_PATH = 
"/job_manager_lock";
+
+       // 
------------------------------------------------------------------------
 
        /** The ZooKeeper client to use */
        private final CuratorFramework client;
@@ -41,29 +92,37 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        /** The runtime configuration */
        private final Configuration configuration;
 
+       public ZookeeperHaServices(Configuration configuration) {
+               this(ZooKeeperUtils.startCuratorFramework(configuration), 
configuration);
+       }
+
        public ZookeeperHaServices(final CuratorFramework client, final 
Configuration configuration) {
-               this.client = client;
-               this.configuration = configuration;
+               this.client = checkNotNull(client);
+               this.configuration = checkNotNull(configuration);
        }
 
+       // 
------------------------------------------------------------------------
+       //  Services
+       // 
------------------------------------------------------------------------
+
        @Override
        public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
-               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
        }
 
        @Override
        public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
-               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathSuffixForJob(jobID));
+               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathForJobManager(jobID));
        }
 
        @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
-               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
        }
 
        @Override
        public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
-               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, getPathSuffixForJob(jobID));
+               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, getPathForJobManager(jobID));
        }
 
        @Override
@@ -76,7 +135,43 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
                return ZooKeeperUtils.createSubmittedJobGraphs(client, 
configuration);
        }
 
-       private static String getPathSuffixForJob(final JobID jobID) {
-               return String.format("/job-managers/%s", jobID);
+       @Override
+       public RunningJobsRegistry getRunningJobsRegistry() throws Exception {
+               throw new UnsupportedOperationException("not yet implemented");
+       }
+
+       @Override
+       public BlobStore createBlobStore() throws IOException {
+               final String storagePath = 
configuration.getValue(HighAvailabilityOptions.HA_STORAGE_PATH);
+               if (isNullOrWhitespaceOnly(storagePath)) {
+                       throw new IllegalConfigurationException("Configuration 
is missing the mandatory parameter: " +
+                                       
HighAvailabilityOptions.HA_STORAGE_PATH);
+               }
+
+               final Path path;
+               try {
+                       path = new Path(storagePath);
+               } catch (Exception e) {
+                       throw new IOException("Invalid path for highly 
available storage (" +
+                                       
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+               }
+
+               final FileSystem fileSystem;
+               try {
+                       fileSystem = path.getFileSystem();
+               } catch (Exception e) {
+                       throw new IOException("Could not create FileSystem for 
highly available storage (" +
+                                       
HighAvailabilityOptions.HA_STORAGE_PATH.key() + ')', e);
+               }
+
+               return new FileSystemBlobStore(fileSystem, storagePath);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       private static String getPathForJobManager(final JobID jobID) {
+               return "/" + jobID + JOB_MANAGER_LEADER_PATH;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
new file mode 100644
index 0000000..85dd711
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/NonHaRegistry.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability.nonha;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+
+import java.util.HashSet;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A registry for running jobs, not-highly available.
+ */
+public class NonHaRegistry implements RunningJobsRegistry {
+
+       /** The currently running jobs */
+       private final HashSet<JobID> running = new HashSet<>();
+
+       @Override
+       public void setJobRunning(JobID jobID) {
+               checkNotNull(jobID);
+
+               synchronized (running) {
+                       running.add(jobID);
+               }
+       }
+
+       @Override
+       public void setJobFinished(JobID jobID) {
+               checkNotNull(jobID);
+
+               synchronized (running) {
+                       running.remove(jobID);
+               }
+       }
+
+       @Override
+       public boolean isJobRunning(JobID jobID) {
+               checkNotNull(jobID);
+
+               synchronized (running) {
+                       return running.contains(jobID);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 6de4253..25a2a66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
 
-public interface OnCompletionActions extends FatalErrorHandler {
+public interface OnCompletionActions {
 
        void jobFinished(JobExecutionResult result);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index a096932..74c1050 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,26 +21,38 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The runner for the job manager. It deals with job level leader election and 
make underlying job manager
  * properly reacted.
  */
-public class JobManagerRunner implements LeaderContender, OnCompletionActions {
+public class JobManagerRunner implements LeaderContender, OnCompletionActions, 
FatalErrorHandler {
 
-       private final Logger log = 
LoggerFactory.getLogger(JobManagerRunner.class);
+       private static final Logger log = 
LoggerFactory.getLogger(JobManagerRunner.class);
+
+       // 
------------------------------------------------------------------------
 
        /** Lock to ensure that this runner can deal with leader election event 
and job completion notifies simultaneously */
        private final Object lock = new Object();
@@ -48,52 +60,141 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        /** The job graph needs to run */
        private final JobGraph jobGraph;
 
-       private final OnCompletionActions toNotify;
+       /** The listener to notify once the job completes - either successfully 
or unsuccessfully */
+       private final OnCompletionActions toNotifyOnComplete;
+
+       /** The handler to call in case of fatal (unrecoverable) errors */ 
+       private final FatalErrorHandler errorHandler;
 
        /** Used to check whether a job needs to be run */
-       private final SubmittedJobGraphStore submittedJobGraphStore;
+       private final RunningJobsRegistry runningJobsRegistry;
 
        /** Leader election for this job */
        private final LeaderElectionService leaderElectionService;
 
+       private final JobManagerServices jobManagerServices;
+
        private final JobMaster jobManager;
 
+       private final JobManagerMetricGroup jobManagerMetricGroup;
+
        /** flag marking the runner as shut down */
        private volatile boolean shutdown;
 
+       // 
------------------------------------------------------------------------
+
        public JobManagerRunner(
-               final JobGraph jobGraph,
-               final Configuration configuration,
-               final RpcService rpcService,
-               final HighAvailabilityServices haServices,
-               final OnCompletionActions toNotify) throws Exception
+                       final JobGraph jobGraph,
+                       final Configuration configuration,
+                       final RpcService rpcService,
+                       final HighAvailabilityServices haServices,
+                       final OnCompletionActions toNotifyOnComplete,
+                       final FatalErrorHandler errorHandler) throws Exception
        {
                this(jobGraph, configuration, rpcService, haServices,
-                       JobManagerServices.fromConfiguration(configuration), 
toNotify);
+                               new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)),
+                               toNotifyOnComplete, errorHandler);
        }
 
        public JobManagerRunner(
-               final JobGraph jobGraph,
-               final Configuration configuration,
-               final RpcService rpcService,
-               final HighAvailabilityServices haServices,
-               final JobManagerServices jobManagerServices,
-               final OnCompletionActions toNotify) throws Exception
+                       final JobGraph jobGraph,
+                       final Configuration configuration,
+                       final RpcService rpcService,
+                       final HighAvailabilityServices haServices,
+                       final MetricRegistry metricRegistry,
+                       final OnCompletionActions toNotifyOnComplete,
+                       final FatalErrorHandler errorHandler) throws Exception
+       {
+               this(jobGraph, configuration, rpcService, haServices,
+                               
JobManagerServices.fromConfiguration(configuration, haServices),
+                               metricRegistry,
+                               toNotifyOnComplete, errorHandler);
+       }
+
+       /**
+        * 
+        * <p>Exceptions that occur while creating the JobManager or 
JobManagerRunner are directly
+        * thrown and not reported to the given {@code FatalErrorHandler}.
+        * 
+        * <p>This JobManagerRunner assumes that it owns the given {@code 
JobManagerServices}.
+        * It will shut them down on error and on calls to {@link #shutdown()}.
+        * 
+        * @throws Exception Thrown if the runner cannot be set up, because 
either one of the
+        *                   required services could not be started, ot the Job 
could not be initialized.
+        */
+       public JobManagerRunner(
+                       final JobGraph jobGraph,
+                       final Configuration configuration,
+                       final RpcService rpcService,
+                       final HighAvailabilityServices haServices,
+                       final JobManagerServices jobManagerServices,
+                       final MetricRegistry metricRegistry,
+                       final OnCompletionActions toNotifyOnComplete,
+                       final FatalErrorHandler errorHandler) throws Exception
        {
-               this.jobGraph = jobGraph;
-               this.toNotify = toNotify;
-               this.submittedJobGraphStore = 
haServices.getSubmittedJobGraphStore();
-               this.leaderElectionService = 
haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
-
-               this.jobManager = new JobMaster(
-                       jobGraph, configuration, rpcService, haServices,
-                       jobManagerServices.libraryCacheManager,
-                       jobManagerServices.restartStrategyFactory,
-                       jobManagerServices.savepointStore,
-                       jobManagerServices.timeout,
-                       new Scheduler(jobManagerServices.executorService),
-                       jobManagerServices.jobManagerMetricGroup,
-                       this);
+
+               JobManagerMetricGroup jobManagerMetrics = null;
+
+               // make sure we cleanly shut down out JobManager services if 
initialization fails
+               try {
+                       this.jobGraph = checkNotNull(jobGraph);
+                       this.toNotifyOnComplete = 
checkNotNull(toNotifyOnComplete);
+                       this.errorHandler = checkNotNull(errorHandler);
+                       this.jobManagerServices = 
checkNotNull(jobManagerServices);
+
+                       checkArgument(jobGraph.getNumberOfVertices() > 0, "The 
given job is empty");
+
+                       final String hostAddress = 
rpcService.getAddress().isEmpty() ? "localhost" : rpcService.getAddress();
+                       jobManagerMetrics = new 
JobManagerMetricGroup(metricRegistry, hostAddress);
+                       this.jobManagerMetricGroup = jobManagerMetrics;
+
+                       // libraries and class loader first
+                       final BlobLibraryCacheManager libraryCacheManager = 
jobManagerServices.libraryCacheManager;
+                       try {
+                               libraryCacheManager.registerJob(
+                                               jobGraph.getJobID(), 
jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths());
+                       } catch (IOException e) {
+                               throw new Exception("Cannot set up the user 
code libraries: " + e.getMessage(), e);
+                       }
+
+                       final ClassLoader userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID());
+                       if (userCodeLoader == null) {
+                               throw new Exception("The user code class loader 
could not be initialized.");
+                       }
+
+                       // high availability services next
+                       this.runningJobsRegistry = 
haServices.getRunningJobsRegistry();
+                       this.leaderElectionService = 
haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
+
+                       // now start the JobManager
+                       this.jobManager = new JobMaster(
+                                       jobGraph, configuration,
+                                       rpcService,
+                                       haServices,
+                                       jobManagerServices.executorService,
+                                       jobManagerServices.libraryCacheManager,
+                                       
jobManagerServices.restartStrategyFactory,
+                                       jobManagerServices.savepointStore,
+                                       jobManagerServices.rpcAskTimeout,
+                                       jobManagerMetrics,
+                                       this,
+                                       this,
+                                       userCodeLoader);
+               }
+               catch (Throwable t) {
+                       // clean up everything
+                       try {
+                               jobManagerServices.shutdown();
+                       } catch (Throwable tt) {
+                               log.error("Error while shutting down JobManager 
services", tt);
+                       }
+
+                       if (jobManagerMetrics != null) {
+                               jobManagerMetrics.close();
+                       }
+
+                       throw new JobExecutionException(jobGraph.getJobID(), 
"Could not set up JobManager", t);
+               }
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -101,9 +202,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        
//----------------------------------------------------------------------------------------------
 
        public void start() throws Exception {
-               jobManager.init();
-               jobManager.start();
-
                try {
                        leaderElectionService.start(this);
                }
@@ -114,11 +212,6 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        }
 
        public void shutdown() {
-               shutdown(new Exception("The JobManager runner is shutting 
down"));
-       }
-
-       public void shutdown(Throwable cause) {
-               // TODO what is the cause used for ?
                shutdownInternally();
        }
 
@@ -129,12 +222,29 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                        if (leaderElectionService != null) {
                                try {
                                        leaderElectionService.stop();
-                               } catch (Exception e) {
-                                       log.error("Could not properly shutdown 
the leader election service.");
+                               } catch (Throwable t) {
+                                       log.error("Could not properly shutdown 
the leader election service", t);
                                }
                        }
 
-                       jobManager.shutDown();
+                       try {
+                               jobManager.shutDown();
+                       } catch (Throwable t) {
+                               log.error("Error shutting down JobManager", t);
+                       }
+
+                       try {
+                               jobManagerServices.shutdown();
+                       } catch (Throwable t) {
+                               log.error("Error shutting down JobManager 
services", t);
+                       }
+
+                       // make all registered metrics go away
+                       try {
+                               jobManagerMetricGroup.close();
+                       } catch (Throwable t) {
+                               log.error("Error while unregistering metrics", 
t);
+                       }
                }
        }
 
@@ -148,11 +258,12 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        @Override
        public void jobFinished(JobExecutionResult result) {
                try {
+                       unregisterJobFromHighAvailability();
                        shutdownInternally();
                }
                finally {
-                       if (toNotify != null) {
-                               toNotify.jobFinished(result);
+                       if (toNotifyOnComplete != null) {
+                               toNotifyOnComplete.jobFinished(result);
                        }
                }
        }
@@ -163,11 +274,12 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        @Override
        public void jobFailed(Throwable cause) {
                try {
+                       unregisterJobFromHighAvailability();
                        shutdownInternally();
                }
                finally {
-                       if (toNotify != null) {
-                               toNotify.jobFailed(cause);
+                       if (toNotifyOnComplete != null) {
+                               toNotifyOnComplete.jobFailed(cause);
                        }
                }
        }
@@ -178,11 +290,12 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        @Override
        public void jobFinishedByOther() {
                try {
+                       unregisterJobFromHighAvailability();
                        shutdownInternally();
                }
                finally {
-                       if (toNotify != null) {
-                               toNotify.jobFinishedByOther();
+                       if (toNotifyOnComplete != null) {
+                               toNotifyOnComplete.jobFinishedByOther();
                        }
                }
        }
@@ -192,18 +305,43 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
         */
        @Override
        public void onFatalError(Throwable exception) {
-               // first and in any case, notify our handler, so it can react 
fast
+               // we log first to make sure an explaining message goes into 
the log
+               // we even guard the log statement here to increase chances 
that the error handler
+               // gets the notification on hard critical situations like 
out-of-memory errors
+               try {
+                       log.error("JobManager runner encountered a fatal 
error.", exception);
+               } catch (Throwable ignored) {}
+
+               // in any case, notify our handler, so it can react fast
                try {
-                       if (toNotify != null) {
-                               toNotify.onFatalError(exception);
+                       if (errorHandler != null) {
+                               errorHandler.onFatalError(exception);
                        }
                }
                finally {
-                       log.error("JobManager runner encountered a fatal 
error.", exception);
+                       // the shutdown may not even needed any more, if the 
fatal error
+                       // handler kills the process. that is fine, a process 
kill cleans up better than anything.
                        shutdownInternally();
                }
        }
 
+       /**
+        * Marks this runner's job as not running. Other JobManager will not 
recover the job
+        * after this call.
+        * 
+        * <p>This method never throws an exception.
+        */
+       private void unregisterJobFromHighAvailability() {
+               try {
+                       runningJobsRegistry.setJobFinished(jobGraph.getJobID());
+               }
+               catch (Throwable t) {
+                       log.error("Could not un-register from high-availability 
services job {} ({})." +
+                                       "Other JobManager's may attempt to 
recover it and re-execute it.",
+                                       jobGraph.getName(), 
jobGraph.getJobID(), t);
+               }
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Leadership methods
        
//----------------------------------------------------------------------------------------------
@@ -223,15 +361,25 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                        // it's okay that job manager wait for the operation 
complete
                        
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
 
+                       boolean jobRunning;
+                       try {
+                               jobRunning = 
runningJobsRegistry.isJobRunning(jobGraph.getJobID());
+                       } catch (Throwable t) {
+                               log.error("Could not access status 
(running/finished) of job {}. " +
+                                               "Falling back to assumption 
that job is running and attempting recovery...",
+                                               jobGraph.getJobID(), t);
+                               jobRunning = true;
+                       }
+
                        // Double check the leadership after we confirm that, 
there is a small chance that multiple
                        // job managers schedule the same job after if they try 
to recover at the same time.
                        // This will eventually be noticed, but can not be 
ruled out from the beginning.
                        if (leaderElectionService.hasLeadership()) {
-                               if (isJobFinishedByOthers()) {
+                               if (jobRunning) {
+                                       jobManager.start(leaderSessionID);
+                               } else {
                                        log.info("Job {} ({}) already finished 
by others.", jobGraph.getName(), jobGraph.getJobID());
                                        jobFinishedByOther();
-                               } else {
-                                       
jobManager.getSelf().startJob(leaderSessionID);
                                }
                        }
                }
@@ -248,7 +396,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                        log.info("JobManager for job {} ({}) was revoked 
leadership at {}.",
                                jobGraph.getName(), jobGraph.getJobID(), 
getAddress());
 
-                       jobManager.getSelf().suspendJob(new 
Exception("JobManager is no longer the leader."));
+                       jobManager.getSelf().suspendExecution(new 
Exception("JobManager is no longer the leader."));
                }
        }
 
@@ -263,11 +411,9 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
                onFatalError(exception);
        }
 
-       @VisibleForTesting
-       boolean isJobFinishedByOthers() {
-               // TODO: Fix
-               return false;
-       }
+       
//----------------------------------------------------------------------------------------------
+       // Testing
+       
//----------------------------------------------------------------------------------------------
 
        @VisibleForTesting
        boolean isShutdown() {

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
index e6beba6..eebfbfa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -19,13 +19,21 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.util.ExceptionUtils;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -42,32 +50,95 @@ public class JobManagerServices {
 
        public final SavepointStore savepointStore;
 
-       public final Time timeout;
-
-       public final JobManagerMetricGroup jobManagerMetricGroup;
+       public final Time rpcAskTimeout;
 
        public JobManagerServices(
                        ExecutorService executorService,
                        BlobLibraryCacheManager libraryCacheManager,
                        RestartStrategyFactory restartStrategyFactory,
                        SavepointStore savepointStore,
-                       Time timeout,
-                       JobManagerMetricGroup jobManagerMetricGroup) {
+                       Time rpcAskTimeout) {
 
                this.executorService = checkNotNull(executorService);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
                this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
                this.savepointStore = checkNotNull(savepointStore);
-               this.timeout = checkNotNull(timeout);
-               this.jobManagerMetricGroup = 
checkNotNull(jobManagerMetricGroup);
+               this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
+       }
+
+       /**
+        * 
+        * <p>This method makes sure all services are closed or shut down, even 
when an exception occurred
+        * in the shutdown of one component. The first encountered exception is 
thrown, with successive
+        * exceptions added as suppressed exceptions.
+        * 
+        * @throws Exception The first Exception encountered during shutdown.
+        */
+       public void shutdown() throws Exception {
+               Throwable firstException = null;
+
+               try {
+                       executorService.shutdownNow();
+               } catch (Throwable t) {
+                       firstException = t;
+               }
+
+               try {
+                       savepointStore.shutdown();
+               }
+               catch (Throwable t) {
+                       if (firstException == null) {
+                               firstException = t;
+                       } else {
+                               firstException.addSuppressed(t);
+                       }
+               }
+
+               try {
+                       libraryCacheManager.shutdown();
+               }
+               catch (Throwable t) {
+                       if (firstException == null) {
+                               firstException = t;
+                       } else {
+                               firstException.addSuppressed(t);
+                       }
+               }
+
+               if (firstException != null) {
+                       ExceptionUtils.rethrowException(firstException, "Error 
while shutting down JobManager services");
+               }
        }
 
        // 
------------------------------------------------------------------------
        //  Creating the components from a configuration 
        // 
------------------------------------------------------------------------
        
-       public static JobManagerServices fromConfiguration(Configuration 
config) throws Exception {
-               // TODO not yet implemented
-               return null;
+
+       public static JobManagerServices fromConfiguration(
+                       Configuration config,
+                       HighAvailabilityServices haServices) throws Exception {
+
+               final BlobServer blobServer = new BlobServer(config, 
haServices);
+
+               final long cleanupInterval = config.getLong(
+                       ConfigConstants.LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL,
+                       
ConfigConstants.DEFAULT_LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL) * 1000;
+
+               final BlobLibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(blobServer, cleanupInterval);
+
+               final FiniteDuration timeout;
+               try {
+                       timeout = AkkaUtils.getTimeout(config);
+               } catch (NumberFormatException e) {
+                       throw new 
IllegalConfigurationException(AkkaUtils.formatDurationParingErrorMessage());
+               }
+
+               return new JobManagerServices(
+                       new ForkJoinPool(),
+                       libraryCacheManager,
+                       
RestartStrategyFactory.createRestartStrategyFactory(config),
+                       SavepointStoreFactory.createFromConfig(config),
+                       Time.of(timeout.length(), timeout.unit()));
        }
 }

Reply via email to