[FLINK-4408] [JobManager] Introduce JobMasterRunner and implement job 
submission & setting up the ExecutionGraph

This closes #2480


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/474ace75
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/474ace75
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/474ace75

Branch: refs/heads/flip-6
Commit: 474ace75f259ff4f949f9a6a9f2adca77cac53a4
Parents: 4d54a8c
Author: Kurt Young <ykt...@gmail.com>
Authored: Thu Sep 8 12:00:13 2016 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Oct 6 13:38:44 2016 +0200

----------------------------------------------------------------------
 .../flink/api/common/JobExecutionResult.java    |   2 +-
 .../flink/api/common/JobSubmissionResult.java   |   2 +-
 .../HighAvailabilityServices.java               |  12 +
 .../runtime/highavailability/NonHaServices.java |  16 +-
 .../runtime/jobmanager/OnCompletionActions.java |  31 ++
 .../runtime/jobmanager/scheduler/Scheduler.java |   9 +
 .../runtime/jobmaster/JobManagerRunner.java     | 288 +++++++++++
 .../runtime/jobmaster/JobManagerServices.java   |  73 +++
 .../flink/runtime/jobmaster/JobMaster.java      | 485 ++++++++++++++-----
 .../runtime/jobmaster/JobMasterGateway.java     |  13 +
 .../jobmaster/MiniClusterJobDispatcher.java     | 385 +++++++++++++++
 .../flink/runtime/rpc/FatalErrorHandler.java    |  24 +
 .../runtime/taskexecutor/TaskExecutor.java      |  12 +
 .../TestingHighAvailabilityServices.java        |  39 +-
 .../jobmaster/JobManagerRunnerMockTest.java     | 254 ++++++++++
 .../flink/runtime/rpc/RpcConnectionTest.java    |  17 +-
 16 files changed, 1533 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
index cb4ecc5..7286cc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobExecutionResult.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
 @Public
 public class JobExecutionResult extends JobSubmissionResult {
 
-       private long netRuntime;
+       private final long netRuntime;
 
        private final Map<String, Object> accumulatorResults;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java 
b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
index c5dc869..b0e7e24 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/JobSubmissionResult.java
@@ -26,7 +26,7 @@ import org.apache.flink.annotation.Public;
 @Public
 public class JobSubmissionResult {
 
-       private JobID jobID;
+       private final JobID jobID;
 
        public JobSubmissionResult(JobID jobID) {
                this.jobID = jobID;

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 7634176..d67e927 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
@@ -61,4 +63,14 @@ public interface HighAvailabilityServices {
         * @param jobID The identifier of the job running the election.
         */
        LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) 
throws Exception;
+
+       /**
+        * Gets the checkpoint recovery factory for the job manager
+        */
+       CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception;
+
+       /**
+        * Gets the submitted job graph store for the job manager
+        */
+       SubmittedJobGraphStore getSubmittedJobGraphStore() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 33dc2d7..a2c9cc4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,13 +19,17 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
-import 
org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -88,4 +92,14 @@ public class NonHaServices implements 
HighAvailabilityServices {
        public LeaderElectionService getJobMasterLeaderElectionService(JobID 
jobID) throws Exception {
                return new StandaloneLeaderElectionService();
        }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+               return new StandaloneCheckpointRecoveryFactory();
+       }
+
+       @Override
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+               return new StandaloneSubmittedJobGraphStore();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
new file mode 100644
index 0000000..6de4253
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+
+public interface OnCompletionActions extends FatalErrorHandler {
+
+       void jobFinished(JobExecutionResult result);
+
+       void jobFailed(Throwable cause);
+
+       void jobFinishedByOther();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b839e0e..aa09314 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -31,6 +31,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import akka.dispatch.Futures;
@@ -57,6 +58,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among 
instances and slots.
@@ -110,6 +112,13 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
        /**
         * Creates a new scheduler.
         */
+       public Scheduler(ExecutorService executor) {
+               this(ExecutionContext$.MODULE$.fromExecutor(executor));
+       }
+       
+       /**
+        * Creates a new scheduler.
+        */
        public Scheduler(ExecutionContext executionContext) {
                this.executionContext = executionContext;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
new file mode 100644
index 0000000..bc2bf9a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+/**
+ * The runner for the job manager. It deals with job level leader election and 
make underlying job manager
+ * properly reacted.
+ */
+public class JobManagerRunner implements LeaderContender, OnCompletionActions {
+
+       private final Logger log = 
LoggerFactory.getLogger(JobManagerRunner.class);
+
+       /** Lock to ensure that this runner can deal with leader election event 
and job completion notifies simultaneously */
+       private final Object lock = new Object();
+
+       /** The job graph needs to run */
+       private final JobGraph jobGraph;
+
+       private final OnCompletionActions toNotify;
+
+       /** The execution context which is used to execute futures */
+       private final Executor executionContext;
+
+       // TODO: use this to decide whether the job is finished by other
+       private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+       /** Leader election for this job */
+       private final LeaderElectionService leaderElectionService;
+
+       private final JobMaster jobManager;
+
+       /** Leader session id when granted leadership */
+       private UUID leaderSessionID;
+
+       /** flag marking the runner as shut down */
+       private volatile boolean shutdown;
+
+       public JobManagerRunner(
+               final JobGraph jobGraph,
+               final Configuration configuration,
+               final RpcService rpcService,
+               final HighAvailabilityServices haServices,
+               final OnCompletionActions toNotify) throws Exception
+       {
+               this(jobGraph, configuration, rpcService, haServices,
+                       JobManagerServices.fromConfiguration(configuration), 
toNotify);
+       }
+
+       public JobManagerRunner(
+               final JobGraph jobGraph,
+               final Configuration configuration,
+               final RpcService rpcService,
+               final HighAvailabilityServices haServices,
+               final JobManagerServices jobManagerServices,
+               final OnCompletionActions toNotify) throws Exception
+       {
+               this.jobGraph = jobGraph;
+               this.toNotify = toNotify;
+               this.executionContext = rpcService.getExecutor();
+               this.checkpointRecoveryFactory = 
haServices.getCheckpointRecoveryFactory();
+               this.leaderElectionService = 
haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
+               this.leaderSessionID = null;
+
+               this.jobManager = new JobMaster(
+                       jobGraph, configuration, rpcService, haServices,
+                       jobManagerServices.libraryCacheManager,
+                       jobManagerServices.restartStrategyFactory,
+                       jobManagerServices.savepointStore,
+                       jobManagerServices.timeout,
+                       new Scheduler(jobManagerServices.executorService),
+                       jobManagerServices.jobManagerMetricGroup,
+                       this);
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Lifecycle management
+       
//----------------------------------------------------------------------------------------------
+
+       public void start() throws Exception {
+               jobManager.init();
+               jobManager.start();
+
+               try {
+                       leaderElectionService.start(this);
+               }
+               catch (Exception e) {
+                       log.error("Could not start the JobManager because the 
leader election service did not start.", e);
+                       throw new Exception("Could not start the leader 
election service.", e);
+               }
+       }
+
+       public void shutdown() {
+               shutdown(new Exception("The JobManager runner is shutting 
down"));
+       }
+
+       public void shutdown(Throwable cause) {
+               // TODO what is the cause used for ?
+               shutdownInternally();
+       }
+
+       private void shutdownInternally() {
+               synchronized (lock) {
+                       shutdown = true;
+
+                       if (leaderElectionService != null) {
+                               try {
+                                       leaderElectionService.stop();
+                               } catch (Exception e) {
+                                       log.error("Could not properly shutdown 
the leader election service.");
+                               }
+                       }
+
+                       jobManager.shutDown();
+               }
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Result and error handling methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Job completion notification triggered by JobManager
+        */
+       @Override
+       public void jobFinished(JobExecutionResult result) {
+               try {
+                       shutdownInternally();
+               }
+               finally {
+                       if (toNotify != null) {
+                               toNotify.jobFinished(result);
+                       }
+               }
+       }
+
+       /**
+        * Job completion notification triggered by JobManager
+        */
+       @Override
+       public void jobFailed(Throwable cause) {
+               try {
+                       shutdownInternally();
+               }
+               finally {
+                       if (toNotify != null) {
+                               toNotify.jobFailed(cause);
+                       }
+               }
+       }
+
+       /**
+        * Job completion notification triggered by self
+        */
+       @Override
+       public void jobFinishedByOther() {
+               try {
+                       shutdownInternally();
+               }
+               finally {
+                       if (toNotify != null) {
+                               toNotify.jobFinishedByOther();
+                       }
+               }
+       }
+
+       /**
+        * Job completion notification triggered by JobManager or self
+        */
+       @Override
+       public void onFatalError(Throwable exception) {
+               // first and in any case, notify our handler, so it can react 
fast
+               try {
+                       if (toNotify != null) {
+                               toNotify.onFatalError(exception);
+                       }
+               }
+               finally {
+                       log.error("JobManager runner encountered a fatal 
error.", exception);
+                       shutdownInternally();
+               }
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Leadership methods
+       
//----------------------------------------------------------------------------------------------
+
+       @Override
+       public void grantLeadership(final UUID leaderSessionID) {
+               synchronized (lock) {
+                       if (shutdown) {
+                               log.info("JobManagerRunner already shutdown.");
+                               return;
+                       }
+
+                       log.info("JobManager runner for job {} ({}) was granted 
leadership with session id {} at {}.",
+                               jobGraph.getName(), jobGraph.getJobID(), 
leaderSessionID, getAddress());
+
+                       // The operation may be blocking, but since this runner 
is idle before it been granted the leadership,
+                       // it's okay that job manager wait for the operation 
complete
+                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+                       this.leaderSessionID = leaderSessionID;
+
+                       // Double check the leadership after we confirm that, 
there is a small chance that multiple
+                       // job managers schedule the same job after if they try 
to recover at the same time.
+                       // This will eventually be noticed, but can not be 
ruled out from the beginning.
+                       if (leaderElectionService.hasLeadership()) {
+                               if (isJobFinishedByOthers()) {
+                                       log.info("Job {} ({}) already finished 
by others.", jobGraph.getName(), jobGraph.getJobID());
+                                       jobFinishedByOther();
+                               } else {
+                                       jobManager.getSelf().startJob();
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void revokeLeadership() {
+               synchronized (lock) {
+                       if (shutdown) {
+                               log.info("JobManagerRunner already shutdown.");
+                               return;
+                       }
+
+                       log.info("JobManager for job {} ({}) was revoked 
leadership at {}.",
+                               jobGraph.getName(), jobGraph.getJobID(), 
getAddress());
+
+                       leaderSessionID = null;
+                       jobManager.getSelf().suspendJob(new 
Exception("JobManager is no longer the leader."));
+               }
+       }
+
+       @Override
+       public String getAddress() {
+               return jobManager.getAddress();
+       }
+
+       @Override
+       public void handleError(Exception exception) {
+               log.error("Leader Election Service encountered a fatal error.", 
exception);
+               onFatalError(exception);
+       }
+
+       @VisibleForTesting
+       boolean isJobFinishedByOthers() {
+               // TODO
+               return false;
+       }
+
+       @VisibleForTesting
+       boolean isShutdown() {
+               return shutdown;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
new file mode 100644
index 0000000..e6beba6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerServices.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class to hold all auxiliary services used by the {@link JobMaster}.
+ */
+public class JobManagerServices {
+
+       public final ExecutorService executorService;
+
+       public final BlobLibraryCacheManager libraryCacheManager;
+
+       public final RestartStrategyFactory restartStrategyFactory;
+
+       public final SavepointStore savepointStore;
+
+       public final Time timeout;
+
+       public final JobManagerMetricGroup jobManagerMetricGroup;
+
+       public JobManagerServices(
+                       ExecutorService executorService,
+                       BlobLibraryCacheManager libraryCacheManager,
+                       RestartStrategyFactory restartStrategyFactory,
+                       SavepointStore savepointStore,
+                       Time timeout,
+                       JobManagerMetricGroup jobManagerMetricGroup) {
+
+               this.executorService = checkNotNull(executorService);
+               this.libraryCacheManager = checkNotNull(libraryCacheManager);
+               this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
+               this.savepointStore = checkNotNull(savepointStore);
+               this.timeout = checkNotNull(timeout);
+               this.jobManagerMetricGroup = 
checkNotNull(jobManagerMetricGroup);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Creating the components from a configuration 
+       // 
------------------------------------------------------------------------
+       
+       public static JobManagerServices fromConfiguration(Configuration 
config) throws Exception {
+               // TODO not yet implemented
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1537396..b52a23c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,21 +18,50 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.leaderelection.LeaderContender;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.util.Preconditions;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.duration.FiniteDuration;
 
-import java.util.UUID;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution 
of a single
@@ -41,7 +70,7 @@ import java.util.UUID;
  * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
  * remotely:
  * <ul>
- *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the 
task execution state for
+ * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task 
execution state for
  * given task</li>
  * </ul>
  */
@@ -52,7 +81,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
        /** Logical representation of the job */
        private final JobGraph jobGraph;
-       private final JobID jobID;
 
        /** Configuration of the job */
        private final Configuration configuration;
@@ -60,32 +88,67 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** Service to contend for and retrieve the leadership of JM and RM */
        private final HighAvailabilityServices highAvailabilityServices;
 
-       /** Leader Management */
-       private LeaderElectionService leaderElectionService = null;
-       private UUID leaderSessionID;
+       /** Blob cache manager used across jobs */
+       private final BlobLibraryCacheManager libraryCacheManager;
+
+       /** Factory to create restart strategy for this job */
+       private final RestartStrategyFactory restartStrategyFactory;
+
+       /** Store for save points */
+       private final SavepointStore savepointStore;
+
+       /** The timeout for this job */
+       private final Time timeout;
+
+       /** The scheduler to use for scheduling new tasks as they are needed */
+       private final Scheduler scheduler;
+
+       /** The metrics group used across jobs */
+       private final JobManagerMetricGroup jobManagerMetricGroup;
+
+       /** The execution context which is used to execute futures */
+       private final Executor executionContext;
+
+       private final OnCompletionActions jobCompletionActions;
+
+       /** The execution graph of this job */
+       private volatile ExecutionGraph executionGraph;
+
+       /** The checkpoint recovery factory used by this job */
+       private CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+       private ClassLoader userCodeLoader;
+
+       private RestartStrategy restartStrategy;
+
+       private MetricGroup jobMetrics;
 
-       /**
-        * The JM's Constructor
-        *
-        * @param jobGraph The representation of the job's execution plan
-        * @param configuration The job's configuration
-        * @param rpcService The RPC service at which the JM serves
-        * @param highAvailabilityService The cluster's HA service from the JM 
can elect and retrieve leaders.
-        */
        public JobMaster(
                JobGraph jobGraph,
                Configuration configuration,
                RpcService rpcService,
-               HighAvailabilityServices highAvailabilityService) {
-
+               HighAvailabilityServices highAvailabilityService,
+               BlobLibraryCacheManager libraryCacheManager,
+               RestartStrategyFactory restartStrategyFactory,
+               SavepointStore savepointStore,
+               Time timeout,
+               Scheduler scheduler,
+               JobManagerMetricGroup jobManagerMetricGroup,
+               OnCompletionActions jobCompletionActions)
+       {
                super(rpcService);
 
-               this.jobGraph = Preconditions.checkNotNull(jobGraph);
-               this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
-
-               this.configuration = Preconditions.checkNotNull(configuration);
-
-               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityService);
+               this.jobGraph = checkNotNull(jobGraph);
+               this.configuration = checkNotNull(configuration);
+               this.highAvailabilityServices = 
checkNotNull(highAvailabilityService);
+               this.libraryCacheManager = checkNotNull(libraryCacheManager);
+               this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
+               this.savepointStore = checkNotNull(savepointStore);
+               this.timeout = checkNotNull(timeout);
+               this.scheduler = checkNotNull(scheduler);
+               this.jobManagerMetricGroup = 
checkNotNull(jobManagerMetricGroup);
+               this.executionContext = checkNotNull(rpcService.getExecutor());
+               this.jobCompletionActions = checkNotNull(jobCompletionActions);
        }
 
        public ResourceManagerGateway getResourceManager() {
@@ -93,93 +156,294 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        
//----------------------------------------------------------------------------------------------
-       // Initialization methods
+       // Lifecycle management
        
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Initializing the job execution environment, should be called before 
start. Any error occurred during
+        * initialization will be treated as job submission failure.
+        *
+        * @throws JobSubmissionException
+        */
+       public void init() throws JobSubmissionException {
+               log.info("Initializing job {} ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+
+               try {
+                       // IMPORTANT: We need to make sure that the library 
registration is the first action,
+                       // because this makes sure that the uploaded jar files 
are removed in case of
+                       // unsuccessful
+                       try {
+                               
libraryCacheManager.registerJob(jobGraph.getJobID(), 
jobGraph.getUserJarBlobKeys(),
+                                       jobGraph.getClasspaths());
+                       } catch (Throwable t) {
+                               throw new 
JobSubmissionException(jobGraph.getJobID(),
+                                       "Cannot set up the user code libraries: 
" + t.getMessage(), t);
+                       }
+
+                       userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID());
+                       if (userCodeLoader == null) {
+                               throw new 
JobSubmissionException(jobGraph.getJobID(),
+                                       "The user code class loader could not 
be initialized.");
+                       }
+
+                       if (jobGraph.getNumberOfVertices() == 0) {
+                               throw new 
JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
+                       }
+
+                       final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+                               jobGraph.getSerializedExecutionConfig()
+                                       .deserializeValue(userCodeLoader)
+                                       .getRestartStrategy();
+                       if (restartStrategyConfiguration != null) {
+                               restartStrategy = 
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
+                       } else {
+                               restartStrategy = 
restartStrategyFactory.createRestartStrategy();
+                       }
+
+                       log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+
+                       if (jobManagerMetricGroup != null) {
+                               jobMetrics = 
jobManagerMetricGroup.addJob(jobGraph);
+                       }
+                       if (jobMetrics == null) {
+                               jobMetrics = new UnregisteredMetricsGroup();
+                       }
+
+                       try {
+                               checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
+                       } catch (Exception e) {
+                               log.error("Could not get the checkpoint 
recovery factory.", e);
+                               throw new 
JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint 
recovery factory.", e);
+                       }
+
+               } catch (Throwable t) {
+                       log.error("Failed to initializing job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
+
+                       libraryCacheManager.unregisterJob(jobGraph.getJobID());
+
+                       if (t instanceof JobSubmissionException) {
+                               throw (JobSubmissionException) t;
+                       } else {
+                               throw new 
JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
+                                       jobGraph.getName() + " (" + 
jobGraph.getJobID() + ")", t);
+                       }
+               }
+       }
+
+       @Override
        public void start() {
                super.start();
-
-               // register at the election once the JM starts
-               registerAtElectionService();
        }
 
+       @Override
+       public void shutDown() {
+               super.shutDown();
+
+               suspendJob(new Exception("JobManager is shutting down."));
+       }
 
        
//----------------------------------------------------------------------------------------------
-       // JobMaster Leadership methods
+       // RPC methods
        
//----------------------------------------------------------------------------------------------
 
        /**
-        * Retrieves the election service and contend for the leadership.
+        * Start to run the job, runtime data structures like ExecutionGraph 
will be constructed now and checkpoint
+        * being recovered. After this, we will begin to schedule the job.
         */
-       private void registerAtElectionService() {
-               try {
-                       leaderElectionService = 
highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
-                       leaderElectionService.start(new 
JobMasterLeaderContender());
-               } catch (Exception e) {
-                       throw new RuntimeException("Fail to register at the 
election of JobMaster", e);
+       @RpcMethod
+       public void startJob() {
+               log.info("Starting job {} ({}).", jobGraph.getName(), 
jobGraph.getJobID());
+
+               if (executionGraph != null) {
+                       executionGraph = new ExecutionGraph(
+                               
ExecutionContext$.MODULE$.fromExecutor(executionContext),
+                               jobGraph.getJobID(),
+                               jobGraph.getName(),
+                               jobGraph.getJobConfiguration(),
+                               jobGraph.getSerializedExecutionConfig(),
+                               new FiniteDuration(timeout.getSize(), 
timeout.getUnit()),
+                               restartStrategy,
+                               jobGraph.getUserJarBlobKeys(),
+                               jobGraph.getClasspaths(),
+                               userCodeLoader,
+                               jobMetrics);
+               } else {
+                       // TODO: update last active time in JobInfo
                }
-       }
 
-       /**
-        * Start the execution when the leadership is granted.
-        *
-        * @param newLeaderSessionID The identifier of the new leadership 
session
-        */
-       public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("JobManager {} grants leadership with 
session id {}.", getAddress(), newLeaderSessionID);
+               try {
+                       
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
+                       
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
+
+                       try {
+                               
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
+                       } catch (Exception e) {
+                               log.warn("Cannot create JSON plan for job {} 
({})", jobGraph.getJobID(), jobGraph.getName(), e);
+                               executionGraph.setJsonPlan("{}");
+                       }
 
-                               // The operation may be blocking, but since JM 
is idle before it grants the leadership, it's okay that
-                               // JM waits here for the operation's 
completeness.
-                               leaderSessionID = newLeaderSessionID;
-                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+                       // initialize the vertices that have a master 
initialization hook
+                       // file output formats create directories here, input 
formats create splits
+                       if (log.isDebugEnabled()) {
+                               log.debug("Running initialization on master for 
job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
+                       }
+                       for (JobVertex vertex : jobGraph.getVertices()) {
+                               final String executableClass = 
vertex.getInvokableClassName();
+                               if (executableClass == null || 
executableClass.length() == 0) {
+                                       throw new 
JobExecutionException(jobGraph.getJobID(),
+                                               "The vertex " + vertex.getID() 
+ " (" + vertex.getName() + ") has no invokable class.");
+                               }
+                               if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
+                                       
vertex.setParallelism(scheduler.getTotalNumberOfSlots());
+                               }
+
+                               try {
+                                       
vertex.initializeOnMaster(userCodeLoader);
+                               } catch (Throwable t) {
+                                       throw new 
JobExecutionException(jobGraph.getJobID(),
+                                               "Cannot initialize task '" + 
vertex.getName() + "': " + t.getMessage(), t);
+                               }
+                       }
 
-                               // TODO:: execute the job when the leadership 
is granted.
+                       // topologically sort the job vertices and attach the 
graph to the existing one
+                       final List<JobVertex> sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+                       if (log.isDebugEnabled()) {
+                               log.debug("Adding {} vertices from job graph {} 
({}).", sortedTopology.size(),
+                                       jobGraph.getJobID(), 
jobGraph.getName());
                        }
-               });
-       }
+                       executionGraph.attachJobGraph(sortedTopology);
 
-       /**
-        * Stop the execution when the leadership is revoked.
-        */
-       public void revokeJobMasterLeadership() {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("JobManager {} was revoked 
leadership.", getAddress());
+                       if (log.isDebugEnabled()) {
+                               log.debug("Successfully created execution graph 
from job graph {} ({}).",
+                                       jobGraph.getJobID(), 
jobGraph.getName());
+                       }
 
-                               // TODO:: cancel the job's execution and notify 
all listeners
-                               cancelAndClearEverything(new 
Exception("JobManager is no longer the leader."));
+                       final JobSnapshottingSettings snapshotSettings = 
jobGraph.getSnapshotSettings();
+                       if (snapshotSettings != null) {
+                               List<ExecutionJobVertex> triggerVertices = 
getExecutionJobVertexWithId(
+                                       executionGraph, 
snapshotSettings.getVerticesToTrigger());
+
+                               List<ExecutionJobVertex> ackVertices = 
getExecutionJobVertexWithId(
+                                       executionGraph, 
snapshotSettings.getVerticesToAcknowledge());
+
+                               List<ExecutionJobVertex> confirmVertices = 
getExecutionJobVertexWithId(
+                                       executionGraph, 
snapshotSettings.getVerticesToConfirm());
+
+                               CompletedCheckpointStore completedCheckpoints = 
checkpointRecoveryFactory.createCheckpointStore(
+                                       jobGraph.getJobID(), userCodeLoader);
+
+                               CheckpointIDCounter checkpointIdCounter = 
checkpointRecoveryFactory.createCheckpointIDCounter(
+                                       jobGraph.getJobID());
+
+                               // Checkpoint stats tracker
+                               boolean isStatsDisabled = 
configuration.getBoolean(
+                                       
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
+                                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
+
+                               final CheckpointStatsTracker 
checkpointStatsTracker;
+                               if (isStatsDisabled) {
+                                       checkpointStatsTracker = new 
DisabledCheckpointStatsTracker();
+                               } else {
+                                       int historySize = 
configuration.getInteger(
+                                               
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
+                                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
+                                       checkpointStatsTracker = new 
SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
+                               }
+
+                               executionGraph.enableSnapshotCheckpointing(
+                                       
snapshotSettings.getCheckpointInterval(),
+                                       snapshotSettings.getCheckpointTimeout(),
+                                       
snapshotSettings.getMinPauseBetweenCheckpoints(),
+                                       
snapshotSettings.getMaxConcurrentCheckpoints(),
+                                       triggerVertices,
+                                       ackVertices,
+                                       confirmVertices,
+                                       checkpointIdCounter,
+                                       completedCheckpoints,
+                                       savepointStore,
+                                       checkpointStatsTracker);
+                       }
+
+                       // TODO: register this class to execution graph as job 
status change listeners
+
+                       // TODO: register client as job / execution status 
change listeners if they are interested
+
+                       /*
+                       TODO: decide whether we should take the savepoint 
before recovery
+
+                       if (isRecovery) {
+                               // this is a recovery of a master failure (this 
master takes over)
+                               executionGraph.restoreLatestCheckpointedState();
+                       } else {
+                               if (snapshotSettings != null) {
+                                       String savepointPath = 
snapshotSettings.getSavepointPath();
+                                       if (savepointPath != null) {
+                                               // got a savepoint
+                                               log.info("Starting job from 
savepoint {}.", savepointPath);
+
+                                               // load the savepoint as a 
checkpoint into the system
+                                               final CompletedCheckpoint 
savepoint = SavepointLoader.loadAndValidateSavepoint(
+                                                       jobGraph.getJobID(), 
executionGraph.getAllVertices(), savepointStore, savepointPath);
+                                               
executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
+
+                                               // Reset the checkpoint ID 
counter
+                                               long nextCheckpointId = 
savepoint.getCheckpointID() + 1;
+                                               log.info("Reset the checkpoint 
ID to " + nextCheckpointId);
+                                               
executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
 
-                               leaderSessionID = null;
+                                               
executionGraph.restoreLatestCheckpointedState();
+                                       }
+                               }
                        }
-               });
-       }
+                       */
 
-       /**
-        * Handles error occurring in the leader election service
-        *
-        * @param exception Exception thrown in the leader election service
-        */
-       public void onJobMasterElectionError(final Exception exception) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.error("Received an error from the 
LeaderElectionService.", exception);
+               } catch (Throwable t) {
+                       log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
 
-                               // TODO:: cancel the job's execution and 
shutdown the JM
-                               cancelAndClearEverything(exception);
+                       executionGraph.fail(t);
+                       executionGraph = null;
 
-                               leaderSessionID = null;
+                       final Throwable rt;
+                       if (t instanceof JobExecutionException) {
+                               rt = (JobExecutionException) t;
+                       } else {
+                               rt = new 
JobExecutionException(jobGraph.getJobID(),
+                                       "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
                        }
-               });
 
+                       // TODO: notify client about this failure
+
+                       jobCompletionActions.jobFailed(rt);
+                       return;
+               }
+
+               // start scheduling job in another thread
+               executionContext.execute(new Runnable() {
+                       @Override
+                       public void run() {
+                               if (executionGraph != null) {
+                                       try {
+                                               
executionGraph.scheduleForExecution(scheduler);
+                                       } catch (Throwable t) {
+                                               executionGraph.fail(t);
+                                       }
+                               }
+                       }
+               });
        }
 
-       
//----------------------------------------------------------------------------------------------
-       // RPC methods
-       
//----------------------------------------------------------------------------------------------
+       /**
+        * Suspending job, all the running tasks will be cancelled, and runtime 
data will be cleared.
+        *
+        * @param cause The reason of why this job been suspended.
+        */
+       @RpcMethod
+       public void suspendJob(final Throwable cause) {
+               if (executionGraph != null) {
+                       executionGraph.suspend(cause);
+                       executionGraph = null;
+               }
+       }
 
        /**
         * Updates the task execution state for a given task.
@@ -208,37 +472,26 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        
//----------------------------------------------------------------------------------------------
 
        /**
-        * Cancel the current job and notify all listeners the job's 
cancellation.
+        * Converts JobVertexIDs to corresponding ExecutionJobVertexes
         *
-        * @param cause Cause for the cancelling.
+        * @param executionGraph The execution graph that holds the relationship
+        * @param vertexIDs      The vertexIDs need to be converted
+        * @return The corresponding ExecutionJobVertexes
+        * @throws JobExecutionException
         */
-       private void cancelAndClearEverything(Throwable cause) {
-               // currently, nothing to do here
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utility classes
-       // 
------------------------------------------------------------------------
-       private class JobMasterLeaderContender implements LeaderContender {
-
-               @Override
-               public void grantLeadership(UUID leaderSessionID) {
-                       
JobMaster.this.grantJobMasterLeadership(leaderSessionID);
-               }
-
-               @Override
-               public void revokeLeadership() {
-                       JobMaster.this.revokeJobMasterLeadership();
-               }
-
-               @Override
-               public String getAddress() {
-                       return JobMaster.this.getAddress();
-               }
-
-               @Override
-               public void handleError(Exception exception) {
-                       onJobMasterElectionError(exception);
+       private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
+               final ExecutionGraph executionGraph, final List<JobVertexID> 
vertexIDs)
+               throws JobExecutionException
+       {
+               final List<ExecutionJobVertex> ret = new 
ArrayList<>(vertexIDs.size());
+               for (JobVertexID vertexID : vertexIDs) {
+                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(vertexID);
+                       if (executionJobVertex == null) {
+                               throw new 
JobExecutionException(executionGraph.getJobID(),
+                                       "The snapshot checkpointing settings 
refer to non-existent vertex " + vertexID);
+                       }
+                       ret.add(executionJobVertex);
                }
+               return ret;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 86bf17c..b281ea8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -29,6 +29,19 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 public interface JobMasterGateway extends RpcGateway {
 
        /**
+        * Making this job begins to run.
+        */
+       void startJob();
+
+       /**
+        * Suspending job, all the running tasks will be cancelled, and runtime 
status will be cleared. Should re-submit
+        * the job before restarting it.
+        *
+        * @param cause The reason of why this job been suspended.
+        */
+       void suspendJob(final Throwable cause);
+
+       /**
         * Updates the task execution state for a given task.
         *
         * @param taskExecutionState New task execution state for a given task

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
new file mode 100644
index 0000000..792bfd5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.rpc.RpcService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The dispatcher that runs in the mini cluster, waits for jobs, and starts 
job masters
+ * upon receiving jobs.
+ */
+public class MiniClusterJobDispatcher {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
+
+       // 
------------------------------------------------------------------------
+
+       /** lock to ensure that this dispatcher executes only one job at a time 
*/
+       private final Object lock = new Object();
+
+       /** the configuration with which the mini cluster was started */
+       private final Configuration configuration;
+
+       /** the RPC service to use by the job managers */
+       private final RpcService rpcService;
+
+       /** services for discovery, leader election, and recovery */
+       private final HighAvailabilityServices haServices;
+
+       /** al the services that the JobManager needs, such as BLOB service, 
factories, etc */
+       private final JobManagerServices jobManagerServices;
+
+       /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
+       private final int numJobManagers;
+
+       /** The runner for the job and master. non-null if a job is currently 
running */
+       private volatile JobManagerRunner[] runners;
+
+       /** flag marking the dispatcher as hut down */
+       private volatile boolean shutdown;
+
+
+       /**
+        * Starts a mini cluster job dispatcher.
+        * 
+        * <p>The dispatcher kicks off one JobManager per job, a behavior 
similar to a
+        * non-highly-available setup.
+        * 
+        * @param config The configuration of the mini cluster
+        * @param haServices Access to the discovery, leader election, and 
recovery services
+        * 
+        * @throws Exception Thrown, if the services for the JobMaster could 
not be started.
+        */
+       public MiniClusterJobDispatcher(
+                       Configuration config,
+                       RpcService rpcService,
+                       HighAvailabilityServices haServices) throws Exception {
+               this(config, rpcService, haServices, 1);
+       }
+
+       /**
+        * Starts a mini cluster job dispatcher.
+        *
+        * <p>The dispatcher may kick off more than one JobManager per job, 
thus simulating
+        * a highly-available setup.
+        * 
+        * @param config The configuration of the mini cluster
+        * @param haServices Access to the discovery, leader election, and 
recovery services
+        * @param numJobManagers The number of JobMasters to start for each job.
+        * 
+        * @throws Exception Thrown, if the services for the JobMaster could 
not be started.
+        */
+       public MiniClusterJobDispatcher(
+                       Configuration config,
+                       RpcService rpcService,
+                       HighAvailabilityServices haServices,
+                       int numJobManagers) throws Exception {
+
+               checkArgument(numJobManagers >= 1);
+               this.configuration = checkNotNull(config);
+               this.rpcService = checkNotNull(rpcService);
+               this.haServices = checkNotNull(haServices);
+               this.numJobManagers = numJobManagers;
+
+               LOG.info("Creating JobMaster services");
+               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  life cycle
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Shuts down the mini cluster dispatcher. If a job is currently 
running, that job will be
+        * terminally failed.
+        */
+       public void shutdown() {
+               synchronized (lock) {
+                       if (!shutdown) {
+                               shutdown = true;
+
+                               LOG.info("Shutting down the dispatcher");
+
+                               // in this shutdown code we copy the references 
to the stack first,
+                               // to avoid concurrent modification
+
+                               JobManagerRunner[] runners = this.runners;
+                               if (runners != null) {
+                                       this.runners = null;
+
+                                       Exception shutdownException = new 
Exception("The MiniCluster is shutting down");
+                                       for (JobManagerRunner runner : runners) 
{
+                                               
runner.shutdown(shutdownException);
+                                       }
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  submitting jobs
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This method executes a job in detached mode. The method returns 
immediately after the job
+        * has been added to the
+        *
+        * @param job  The Flink job to execute
+        *
+        * @throws JobExecutionException Thrown if anything went amiss during 
initial job launch,
+        *         or if the job terminally failed.
+        */
+       public void runDetached(JobGraph job) throws JobExecutionException {
+               checkNotNull(job);
+
+               LOG.info("Received job for detached execution {} ({})", 
job.getName(), job.getJobID());
+
+               synchronized (lock) {
+                       checkState(!shutdown, "mini cluster is shut down");
+                       checkState(runners == null, "mini cluster can only 
execute one job at a time");
+
+                       OnCompletionActions onJobCompletion = new 
DetachedFinalizer(numJobManagers);
+
+                       this.runners = startJobRunners(job, onJobCompletion);
+               }
+       }
+
+       /**
+        * This method runs a job in blocking mode. The method returns only 
after the job
+        * completed successfully, or after it failed terminally.
+        *
+        * @param job  The Flink job to execute 
+        * @return The result of the job execution
+        *
+        * @throws JobExecutionException Thrown if anything went amiss during 
initial job lauch,
+        *         or if the job terminally failed.
+        */
+       public JobExecutionResult runJobBlocking(JobGraph job) throws 
JobExecutionException, InterruptedException {
+               checkNotNull(job);
+               
+               LOG.info("Received job for blocking execution {} ({})", 
job.getName(), job.getJobID());
+               final BlockingJobSync onJobCompletion = new 
BlockingJobSync(job.getJobID(), numJobManagers);
+
+               synchronized (lock) {
+                       checkState(!shutdown, "mini cluster is shut down");
+                       checkState(runners == null, "mini cluster can only 
execute one job at a time");
+
+                       this.runners = startJobRunners(job, onJobCompletion);
+               }
+
+               try {
+                       return onJobCompletion.getResult();
+               }
+               finally {
+                       // always clear the status for the next job
+                       runners = null;
+               }
+       }
+
+       private JobManagerRunner[] startJobRunners(JobGraph job, 
OnCompletionActions onCompletion) throws JobExecutionException {
+               LOG.info("Starting {} JobMaster(s) for job {} ({})", 
numJobManagers, job.getName(), job.getJobID());
+
+               JobManagerRunner[] runners = new 
JobManagerRunner[numJobManagers];
+               for (int i = 0; i < numJobManagers; i++) {
+                       try {
+                               runners[i] = new JobManagerRunner(job, 
configuration,
+                                               rpcService, haServices, 
jobManagerServices, onCompletion);
+                               runners[i].start();
+                       }
+                       catch (Throwable t) {
+                               // shut down all the ones so far
+                               Exception shutdownCause = new Exception("Failed 
to properly start all mini cluster JobManagers", t);
+
+                               for (int k = 0; k <= i; k++) {
+                                       try {
+                                               if (runners[i] != null) {
+                                                       
runners[i].shutdown(shutdownCause);
+                                               }
+                                       } catch (Throwable ignored) {
+                                               // silent shutdown
+                                       }
+                               }
+
+                               throw new JobExecutionException(job.getJobID(), 
"Could not start the JobManager(s) for the job", t);
+                       }
+               }
+
+               return runners;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test methods to simulate job master failures
+       // 
------------------------------------------------------------------------
+
+       public void killJobMaster(int which) {
+               checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
+               checkState(!shutdown, "mini cluster is shut down");
+
+               JobManagerRunner[] runners = this.runners;
+               checkState(runners != null, "mini cluster it not executing a 
job right now");
+
+               runners[which].shutdown(new Throwable("kill JobManager"));
+       }
+
+       // 
------------------------------------------------------------------------
+       //  utility classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Simple class that waits for all runners to have reported that they 
are done.
+        * In the case of a high-availability test setup, there may be multiple 
runners.
+        * After that, it marks the mini cluster as ready to receive new jobs.
+        */
+       private class DetachedFinalizer implements OnCompletionActions {
+
+               private final AtomicInteger numJobManagersToWaitFor;
+
+               private DetachedFinalizer(int numJobManagersToWaitFor) {
+                       this.numJobManagersToWaitFor = new 
AtomicInteger(numJobManagersToWaitFor);
+               }
+
+               @Override
+               public void jobFinished(JobExecutionResult result) {
+                       decrementCheckAndCleanup();
+               }
+
+               @Override
+               public void jobFailed(Throwable cause) {
+                       decrementCheckAndCleanup();
+               }
+
+               @Override
+               public void jobFinishedByOther() {
+                       decrementCheckAndCleanup();
+               }
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       decrementCheckAndCleanup();
+               }
+
+               private void decrementCheckAndCleanup() {
+                       if (numJobManagersToWaitFor.decrementAndGet() == 0) {
+                               MiniClusterJobDispatcher.this.runners = null;
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This class is used to sync on blocking jobs across multiple runners.
+        * Only after all runners reported back that they are finished, the
+        * result will be released.
+        * 
+        * That way it is guaranteed that after the blocking job submit call 
returns,
+        * the dispatcher is immediately free to accept another job.
+        */
+       private static class BlockingJobSync implements OnCompletionActions {
+
+               private final JobID jobId;
+
+               private final CountDownLatch jobMastersToWaitFor;
+
+               private volatile Throwable jobException;
+
+               private volatile Throwable runnerException;
+
+               private volatile JobExecutionResult result;
+               
+               BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
+                       this.jobId = jobId;
+                       this.jobMastersToWaitFor = new 
CountDownLatch(numJobMastersToWaitFor);
+               }
+
+               @Override
+               public void jobFinished(JobExecutionResult jobResult) {
+                       this.result = jobResult;
+                       jobMastersToWaitFor.countDown();
+               }
+
+               @Override
+               public void jobFailed(Throwable cause) {
+                       jobException = cause;
+                       jobMastersToWaitFor.countDown();
+               }
+
+               @Override
+               public void jobFinishedByOther() {
+                       this.jobMastersToWaitFor.countDown();
+               }
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       if (runnerException == null) {
+                               runnerException = exception;
+                       }
+               }
+
+               public JobExecutionResult getResult() throws 
JobExecutionException, InterruptedException {
+                       jobMastersToWaitFor.await();
+
+                       final Throwable jobFailureCause = this.jobException;
+                       final Throwable runnerException = this.runnerException;
+                       final JobExecutionResult result = this.result;
+
+                       // (1) we check if teh job terminated with an exception
+                       // (2) we check whether the job completed successfully
+                       // (3) we check if we have exceptions from the 
JobManagers. the job may still have
+                       //     completed successfully in that case, if multiple 
JobMasters were running
+                       //     and other took over. only if all encounter a 
fatal error, the job cannot finish
+
+                       if (jobFailureCause != null) {
+                               if (jobFailureCause instanceof 
JobExecutionException) {
+                                       throw (JobExecutionException) 
jobFailureCause;
+                               }
+                               else {
+                                       throw new JobExecutionException(jobId, 
"The job execution failed", jobFailureCause);
+                               }
+                       }
+                       else if (result != null) {
+                               return result;
+                       }
+                       else if (runnerException != null) {
+                               throw new JobExecutionException(jobId,
+                                               "The job execution failed 
because all JobManagers encountered fatal errors", runnerException);
+                       }
+                       else {
+                               throw new IllegalStateException("Bug: Job 
finished with neither error nor result.");
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
new file mode 100644
index 0000000..7721117
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/FatalErrorHandler.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+public interface FatalErrorHandler {
+
+       void onFatalError(Throwable exception);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cf709c8..9e3c3b9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -28,6 +29,7 @@ import 
org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
@@ -340,6 +342,16 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        public LeaderElectionService 
getJobMasterLeaderElectionService(JobID jobID) throws Exception {
                                return null;
                        }
+
+                       @Override
+                       public CheckpointRecoveryFactory 
getCheckpointRecoveryFactory() throws Exception {
+                               return null;
+                       }
+
+                       @Override
+                       public SubmittedJobGraphStore 
getSubmittedJobGraphStore() throws Exception {
+                               return null;
+                       }
                };
 
                // start all the TaskManager services (network stack,  library 
cache, ...)

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 2ac43be..1a5450d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
-import 
org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
+import java.util.concurrent.ConcurrentHashMap;
+
 /**
  * A variant of the HighAvailabilityServices for testing. Each individual 
service can be set
  * to an arbitrary implementation, such as a mock or default service.
@@ -37,6 +40,9 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
        private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
 
+       private volatile CheckpointRecoveryFactory checkpointRecoveryFactory;
+
+       private volatile SubmittedJobGraphStore submittedJobGraphStore;
 
        // 
------------------------------------------------------------------------
        //  Setters for mock / testing implementations
@@ -58,6 +64,14 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                this.resourceManagerLeaderElectionService = 
leaderElectionService;
        }
 
+       public void setCheckpointRecoveryFactory(CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
+               this.checkpointRecoveryFactory = checkpointRecoveryFactory;
+       }
+
+       public void setSubmittedJobGraphStore(SubmittedJobGraphStore 
submittedJobGraphStore) {
+               this.submittedJobGraphStore = submittedJobGraphStore;
+       }
+
        // 
------------------------------------------------------------------------
        //  HA Services Methods
        // 
------------------------------------------------------------------------
@@ -103,4 +117,27 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                        throw new 
IllegalStateException("ResourceManagerLeaderElectionService has not been set");
                }
        }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+               CheckpointRecoveryFactory factory = checkpointRecoveryFactory;
+
+               if (factory != null) {
+                       return factory;
+               } else {
+                       throw new 
IllegalStateException("CheckpointRecoveryFactory has not been set");
+               }
+       }
+
+       @Override
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+               SubmittedJobGraphStore store = submittedJobGraphStore;
+
+               if (store != null) {
+                       return store;
+               } else {
+                       throw new IllegalStateException("SubmittedJobGraphStore 
has not been set");
+
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
new file mode 100644
index 0000000..dc3b5fd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobManagerRunner.class)
+public class JobManagerRunnerMockTest {
+
+       private JobManagerRunner runner;
+
+       private JobMaster jobManager;
+
+       private JobMasterGateway jobManagerGateway;
+
+       private LeaderElectionService leaderElectionService;
+
+       private TestingOnCompletionActions jobCompletion;
+
+       @Before
+       public void setUp() throws Exception {
+               jobManager = mock(JobMaster.class);
+               jobManagerGateway = mock(JobMasterGateway.class);
+               when(jobManager.getSelf()).thenReturn(jobManagerGateway);
+
+               
PowerMockito.whenNew(JobMaster.class).withAnyArguments().thenReturn(jobManager);
+
+               jobCompletion = new TestingOnCompletionActions();
+
+               leaderElectionService = mock(LeaderElectionService.class);
+               when(leaderElectionService.hasLeadership()).thenReturn(true);
+
+               HighAvailabilityServices haServices = 
mock(HighAvailabilityServices.class);
+               
when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+
+               runner = PowerMockito.spy(new JobManagerRunner(
+                       new JobGraph("test"),
+                       mock(Configuration.class),
+                       mock(RpcService.class),
+                       haServices,
+                       mock(JobManagerServices.class),
+                       jobCompletion));
+       }
+
+       @After
+       public void tearDown() throws Exception {
+       }
+
+       @Test
+       public void testStartAndShutdown() throws Exception {
+               runner.start();
+               verify(jobManager).init();
+               verify(jobManager).start();
+               verify(leaderElectionService).start(runner);
+
+               assertTrue(!jobCompletion.isJobFinished());
+               assertTrue(!jobCompletion.isJobFailed());
+
+               runner.shutdown();
+               verify(leaderElectionService).stop();
+               verify(jobManager).shutDown();
+       }
+
+       @Test
+       public void testShutdownBeforeGrantLeadership() throws Exception {
+               runner.start();
+               verify(jobManager).init();
+               verify(jobManager).start();
+               verify(leaderElectionService).start(runner);
+
+               runner.shutdown();
+               verify(leaderElectionService).stop();
+               verify(jobManager).shutDown();
+
+               assertTrue(!jobCompletion.isJobFinished());
+               assertTrue(!jobCompletion.isJobFailed());
+
+               runner.grantLeadership(UUID.randomUUID());
+               assertTrue(!jobCompletion.isJobFinished());
+               assertTrue(!jobCompletion.isJobFailed());
+
+       }
+
+       @Test
+       public void testJobFinishedByOtherBeforeGrantLeadership() throws 
Exception {
+               runner.start();
+
+               when(runner.isJobFinishedByOthers()).thenReturn(true);
+               runner.grantLeadership(UUID.randomUUID());
+
+               // runner should shutdown automatic and informed the job 
completion
+               verify(leaderElectionService).stop();
+               verify(jobManager).shutDown();
+
+               assertTrue(jobCompletion.isJobFinished());
+               assertTrue(jobCompletion.isJobFinishedByOther());
+       }
+
+       @Test
+       public void testJobFinished() throws Exception {
+               runner.start();
+
+               runner.grantLeadership(UUID.randomUUID());
+               verify(jobManagerGateway).startJob();
+               assertTrue(!jobCompletion.isJobFinished());
+
+               // runner been told by JobManager that job is finished
+               runner.jobFinished(mock(JobExecutionResult.class));
+
+               assertTrue(jobCompletion.isJobFinished());
+               assertFalse(jobCompletion.isJobFinishedByOther());
+               verify(leaderElectionService).stop();
+               verify(jobManager).shutDown();
+               assertTrue(runner.isShutdown());
+       }
+
+       @Test
+       public void testJobFailed() throws Exception {
+               runner.start();
+
+               runner.grantLeadership(UUID.randomUUID());
+               verify(jobManagerGateway).startJob();
+               assertTrue(!jobCompletion.isJobFinished());
+
+               // runner been told by JobManager that job is failed
+               runner.jobFailed(new Exception("failed manually"));
+
+               assertTrue(jobCompletion.isJobFailed());
+               verify(leaderElectionService).stop();
+               verify(jobManager).shutDown();
+               assertTrue(runner.isShutdown());
+       }
+
+       @Test
+       public void testLeadershipRevoked() throws Exception {
+               runner.start();
+
+               runner.grantLeadership(UUID.randomUUID());
+               verify(jobManagerGateway).startJob();
+               assertTrue(!jobCompletion.isJobFinished());
+
+               runner.revokeLeadership();
+               verify(jobManagerGateway).suspendJob(any(Throwable.class));
+               assertFalse(runner.isShutdown());
+       }
+
+       @Test
+       public void testRegainLeadership() throws Exception {
+               runner.start();
+
+               runner.grantLeadership(UUID.randomUUID());
+               verify(jobManagerGateway).startJob();
+               assertTrue(!jobCompletion.isJobFinished());
+
+               runner.revokeLeadership();
+               verify(jobManagerGateway).suspendJob(any(Throwable.class));
+               assertFalse(runner.isShutdown());
+
+               runner.grantLeadership(UUID.randomUUID());
+               verify(jobManagerGateway, times(2)).startJob();
+       }
+
+       private static class TestingOnCompletionActions implements 
OnCompletionActions {
+
+               private volatile JobExecutionResult result;
+
+               private volatile Throwable failedCause;
+
+               private volatile boolean finishedByOther;
+
+               @Override
+               public void jobFinished(JobExecutionResult result) {
+                       checkArgument(!isJobFinished(), "job finished already");
+                       checkArgument(!isJobFailed(), "job failed already");
+
+                       this.result = result;
+               }
+
+               @Override
+               public void jobFailed(Throwable cause) {
+                       checkArgument(!isJobFinished(), "job finished already");
+                       checkArgument(!isJobFailed(), "job failed already");
+
+                       this.failedCause = cause;
+               }
+
+               @Override
+               public void jobFinishedByOther() {
+                       checkArgument(!isJobFinished(), "job finished already");
+                       checkArgument(!isJobFailed(), "job failed already");
+
+                       this.finishedByOther = true;
+               }
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       jobFailed(exception);
+               }
+
+               boolean isJobFinished() {
+                       return result != null || finishedByOther;
+               }
+
+               boolean isJobFinishedByOther() {
+                       return finishedByOther;
+               }
+
+               boolean isJobFailed() {
+                       return failedCause != null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/474ace75/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 6363662..e05c8d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -19,23 +19,21 @@
 package org.apache.flink.runtime.rpc;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
-import org.junit.AfterClass;
 import org.junit.Test;
 
 import scala.Option;
 import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -57,19 +55,20 @@ public class RpcConnectionTest {
 
                        // we start the RPC service with a very long timeout to 
ensure that the test
                        // can only pass if the connection problem is not 
recognized merely via a timeout
-                       rpcService = new AkkaRpcService(actorSystem, new 
Timeout(10000000, TimeUnit.SECONDS));
+                       rpcService = new AkkaRpcService(actorSystem, 
Time.of(10000000, TimeUnit.SECONDS));
 
                        Future<TaskExecutorGateway> future = 
rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
 
-                       Await.result(future, new FiniteDuration(10000000, 
TimeUnit.SECONDS));
+                       future.get(10000000, TimeUnit.SECONDS);
                        fail("should never complete normally");
                }
                catch (TimeoutException e) {
                        fail("should not fail with a generic timeout 
exception");
                }
-               catch (RpcConnectionException e) {
+               catch (ExecutionException e) {
                        // that is what we want
-                       assertTrue("wrong error message", 
e.getMessage().contains("foo.bar.com.test.invalid"));
+                       assertTrue(e.getCause() instanceof 
RpcConnectionException);
+                       assertTrue("wrong error message", 
e.getCause().getMessage().contains("foo.bar.com.test.invalid"));
                }
                catch (Throwable t) {
                        fail("wrong exception: " + t);

Reply via email to