http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 3b8fc97..d9ff88f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,38 +18,31 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
-import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -59,18 +52,13 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
@@ -83,9 +71,12 @@ import 
org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -93,15 +84,13 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.duration.FiniteDuration;
-
+import javax.annotation.Nullable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -112,16 +101,21 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
  * remotely:
  * <ul>
- * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task 
execution state for
+ * <li>{@link #updateTaskExecutionState} updates the task execution state for
  * given task</li>
  * </ul>
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
+       private static final AtomicReferenceFieldUpdater<JobMaster, UUID> 
LEADER_ID_UPDATER =
+                       AtomicReferenceFieldUpdater.newUpdater(JobMaster.class, 
UUID.class, "leaderSessionID");
+
+       // 
------------------------------------------------------------------------
+
        /** Logical representation of the job */
        private final JobGraph jobGraph;
 
-       /** Configuration of the job */
+       /** Configuration of the JobManager */
        private final Configuration configuration;
 
        /** Service to contend for and retrieve the leadership of JM and RM */
@@ -130,37 +124,24 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** Blob cache manager used across jobs */
        private final BlobLibraryCacheManager libraryCacheManager;
 
-       /** Factory to create restart strategy for this job */
-       private final RestartStrategyFactory restartStrategyFactory;
-
-       /** Store for save points */
-       private final SavepointStore savepointStore;
-
-       /** The timeout for this job */
-       private final Time timeout;
-
-       /** The scheduler to use for scheduling new tasks as they are needed */
-       private final Scheduler scheduler;
+       /** The metrics for the JobManager itself */
+       private final MetricGroup jobManagerMetricGroup;
 
-       /** The metrics group used across jobs */
-       private final JobManagerMetricGroup jobManagerMetricGroup;
+       /** The metrics for the job */
+       private final MetricGroup jobMetricGroup;
 
        /** The execution context which is used to execute futures */
-       private final Executor executionContext;
+       private final ExecutorService executionContext;
 
        private final OnCompletionActions jobCompletionActions;
 
-       /** The execution graph of this job */
-       private volatile ExecutionGraph executionGraph;
-
-       /** The checkpoint recovery factory used by this job */
-       private CheckpointRecoveryFactory checkpointRecoveryFactory;
+       private final FatalErrorHandler errorHandler;
 
-       private ClassLoader userCodeLoader;
+       private final ClassLoader userCodeLoader;
 
-       private RestartStrategy restartStrategy;
+       /** The execution graph of this job */
+       private final ExecutionGraph executionGraph;
 
-       private MetricGroup jobMetrics;
 
        private volatile UUID leaderSessionID;
 
@@ -170,22 +151,27 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        private LeaderRetrievalService resourceManagerLeaderRetriever;
 
        /** Connection with ResourceManager, null if not located address yet or 
we close it initiative */
-       private volatile ResourceManagerConnection resourceManagerConnection;
+       private ResourceManagerConnection resourceManagerConnection;
+
+       // TODO - we need to replace this with the slot pool
+       private final Scheduler scheduler;
 
        // 
------------------------------------------------------------------------
 
        public JobMaster(
-               JobGraph jobGraph,
-               Configuration configuration,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityService,
-               BlobLibraryCacheManager libraryCacheManager,
-               RestartStrategyFactory restartStrategyFactory,
-               SavepointStore savepointStore,
-               Time timeout,
-               Scheduler scheduler,
-               JobManagerMetricGroup jobManagerMetricGroup,
-               OnCompletionActions jobCompletionActions)
+                       JobGraph jobGraph,
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityService,
+                       ExecutorService executorService,
+                       BlobLibraryCacheManager libraryCacheManager,
+                       RestartStrategyFactory restartStrategyFactory,
+                       SavepointStore savepointStore,
+                       Time rpcAskTimeout,
+                       @Nullable JobManagerMetricGroup jobManagerMetricGroup,
+                       OnCompletionActions jobCompletionActions,
+                       FatalErrorHandler errorHandler,
+                       ClassLoader userCodeLoader) throws Exception
        {
                super(rpcService);
 
@@ -193,289 +179,150 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                this.configuration = checkNotNull(configuration);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityService);
                this.libraryCacheManager = checkNotNull(libraryCacheManager);
-               this.restartStrategyFactory = 
checkNotNull(restartStrategyFactory);
-               this.savepointStore = checkNotNull(savepointStore);
-               this.timeout = checkNotNull(timeout);
-               this.scheduler = checkNotNull(scheduler);
-               this.jobManagerMetricGroup = 
checkNotNull(jobManagerMetricGroup);
-               this.executionContext = checkNotNull(rpcService.getExecutor());
+               this.executionContext = checkNotNull(executorService);
                this.jobCompletionActions = checkNotNull(jobCompletionActions);
-       }
+               this.errorHandler = checkNotNull(errorHandler);
+               this.userCodeLoader = checkNotNull(userCodeLoader);
 
-       
//----------------------------------------------------------------------------------------------
-       // Lifecycle management
-       
//----------------------------------------------------------------------------------------------
-
-       /**
-        * Initializing the job execution environment, should be called before 
start. Any error occurred during
-        * initialization will be treated as job submission failure.
-        *
-        * @throws JobSubmissionException
-        */
-       public void init() throws JobSubmissionException {
-               log.info("Initializing job {} ({}).", jobGraph.getName(), 
jobGraph.getJobID());
-
-               try {
-                       // IMPORTANT: We need to make sure that the library 
registration is the first action,
-                       // because this makes sure that the uploaded jar files 
are removed in case of
-                       // unsuccessful
-                       try {
-                               
libraryCacheManager.registerJob(jobGraph.getJobID(), 
jobGraph.getUserJarBlobKeys(),
-                                       jobGraph.getClasspaths());
-                       } catch (Throwable t) {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(),
-                                       "Cannot set up the user code libraries: 
" + t.getMessage(), t);
-                       }
+               final String jobName = jobGraph.getName();
+               final JobID jid = jobGraph.getJobID();
 
-                       userCodeLoader = 
libraryCacheManager.getClassLoader(jobGraph.getJobID());
-                       if (userCodeLoader == null) {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(),
-                                       "The user code class loader could not 
be initialized.");
-                       }
+               if (jobManagerMetricGroup != null) {
+                       this.jobManagerMetricGroup = jobManagerMetricGroup;
+                       this.jobMetricGroup = 
jobManagerMetricGroup.addJob(jobGraph);
+               } else {
+                       this.jobManagerMetricGroup = new 
UnregisteredMetricsGroup();
+                       this.jobMetricGroup = new UnregisteredMetricsGroup();
+               }
 
-                       if (jobGraph.getNumberOfVertices() == 0) {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(), "The given job is empty");
-                       }
+               log.info("Initializing job {} ({}).", jobName, jid);
 
-                       final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
+               final RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
                                jobGraph.getSerializedExecutionConfig()
-                                       .deserializeValue(userCodeLoader)
-                                       .getRestartStrategy();
-                       if (restartStrategyConfiguration != null) {
-                               restartStrategy = 
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration);
-                       }
-                       else {
-                               restartStrategy = 
restartStrategyFactory.createRestartStrategy();
-                       }
+                                               
.deserializeValue(userCodeLoader)
+                                               .getRestartStrategy();
 
-                       log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobGraph.getName(), jobGraph.getJobID());
+               final RestartStrategy restartStrategy = 
(restartStrategyConfiguration != null) ?
+                               
RestartStrategyFactory.createRestartStrategy(restartStrategyConfiguration) :
+                               restartStrategyFactory.createRestartStrategy();
 
-                       if (jobManagerMetricGroup != null) {
-                               jobMetrics = 
jobManagerMetricGroup.addJob(jobGraph);
-                       }
-                       if (jobMetrics == null) {
-                               jobMetrics = new UnregisteredMetricsGroup();
-                       }
+               log.info("Using restart strategy {} for {} ({}).", 
restartStrategy, jobName, jid);
 
-                       try {
-                               checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
-                       } catch (Exception e) {
-                               log.error("Could not get the checkpoint 
recovery factory.", e);
-                               throw new 
JobSubmissionException(jobGraph.getJobID(), "Could not get the checkpoint 
recovery factory.", e);
-                       }
+               CheckpointRecoveryFactory checkpointRecoveryFactory;
+               try {
+                       checkpointRecoveryFactory = 
highAvailabilityServices.getCheckpointRecoveryFactory();
+               } catch (Exception e) {
+                       log.error("Could not create the access to 
highly-available checkpoint storage.", e);
+                       throw new Exception("Could not create the access to 
highly-available checkpoint storage.", e);
+               }
 
-                       try {
-                               resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
-                       } catch (Exception e) {
-                               log.error("Could not get the resource manager 
leader retriever.", e);
-                               throw new 
JobSubmissionException(jobGraph.getJobID(),
+               try {
+                       resourceManagerLeaderRetriever = 
highAvailabilityServices.getResourceManagerLeaderRetriever();
+               } catch (Exception e) {
+                       log.error("Could not get the resource manager leader 
retriever.", e);
+                       throw new JobSubmissionException(jobGraph.getJobID(),
                                        "Could not get the resource manager 
leader retriever.", e);
-                       }
-               } catch (Throwable t) {
-                       log.error("Failed to initializing job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
+               }
 
-                       libraryCacheManager.unregisterJob(jobGraph.getJobID());
+               this.executionGraph = ExecutionGraphBuilder.buildGraph(
+                               null,
+                               jobGraph,
+                               configuration,
+                               executorService,
+                               userCodeLoader,
+                               checkpointRecoveryFactory,
+                               savepointStore,
+                               rpcAskTimeout,
+                               restartStrategy,
+                               jobMetricGroup,
+                               -1,
+                               log);
 
-                       if (t instanceof JobSubmissionException) {
-                               throw (JobSubmissionException) t;
-                       }
-                       else {
-                               throw new 
JobSubmissionException(jobGraph.getJobID(), "Failed to initialize job " +
-                                       jobGraph.getName() + " (" + 
jobGraph.getJobID() + ")", t);
-                       }
-               }
+               // TODO - temp fix
+               this.scheduler = new Scheduler(executorService);
        }
 
+       
//----------------------------------------------------------------------------------------------
+       // Lifecycle management
+       
//----------------------------------------------------------------------------------------------
+
+
        @Override
        public void start() {
-               super.start();
+               throw new UnsupportedOperationException("Should never call 
start() without leader ID");
        }
 
+       /**
+        * Start the rpc service and begin to run the job.
+        *
+        * @param leaderSessionID The necessary leader id for running the job.
+        */
+       public void start(final UUID leaderSessionID) {
+               if (LEADER_ID_UPDATER.compareAndSet(this, null, 
leaderSessionID)) {
+                       super.start();
+
+                       log.info("Starting JobManager for job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
+                       getSelf().startJobExecution();
+               } else {
+                       log.warn("Job already started with leaderId {}, 
ignoring this start request.", leaderSessionID);
+               }
+       }
+
+       /**
+        * Suspend the job and shutdown all other services including rpc.
+        */
        @Override
        public void shutDown() {
+               // make sure there is a graceful exit
+               getSelf().suspendExecution(new Exception("JobManager is 
shutting down."));
                super.shutDown();
-
-               suspendJob(new Exception("JobManager is shutting down."));
-
-               disposeCommunicationWithResourceManager();
        }
 
-
-
        
//----------------------------------------------------------------------------------------------
        // RPC methods
        
//----------------------------------------------------------------------------------------------
 
-       /**
-        * Start to run the job, runtime data structures like ExecutionGraph 
will be constructed now and checkpoint
-        * being recovered. After this, we will begin to schedule the job.
-        */
+       //-- job starting and stopping  
-----------------------------------------------------------------
+
        @RpcMethod
-       public void startJob(final UUID leaderSessionID) {
-               log.info("Starting job {} ({}) with leaderId {}.", 
jobGraph.getName(), jobGraph.getJobID(), leaderSessionID);
-
-               this.leaderSessionID = leaderSessionID;
-
-               if (executionGraph != null) {
-                       executionGraph = new ExecutionGraph(
-                               
ExecutionContext$.MODULE$.fromExecutor(executionContext),
-                               jobGraph.getJobID(),
-                               jobGraph.getName(),
-                               jobGraph.getJobConfiguration(),
-                               jobGraph.getSerializedExecutionConfig(),
-                               new FiniteDuration(timeout.getSize(), 
timeout.getUnit()),
-                               restartStrategy,
-                               jobGraph.getUserJarBlobKeys(),
-                               jobGraph.getClasspaths(),
-                               userCodeLoader,
-                               jobMetrics);
-               }
-               else {
-                       // TODO: update last active time in JobInfo
-               }
+       public void startJobExecution() {
+               log.info("Starting execution of job {} ({}) with leaderId {}.",
+                               jobGraph.getName(), jobGraph.getJobID(), 
leaderSessionID);
 
                try {
-                       
executionGraph.setScheduleMode(jobGraph.getScheduleMode());
-                       
executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
-
-                       try {
-                               
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
-                       } catch (Exception e) {
-                               log.warn("Cannot create JSON plan for job {} 
({})", jobGraph.getJobID(), jobGraph.getName(), e);
-                               executionGraph.setJsonPlan("{}");
-                       }
-
-                       // initialize the vertices that have a master 
initialization hook
-                       // file output formats create directories here, input 
formats create splits
-                       if (log.isDebugEnabled()) {
-                               log.debug("Running initialization on master for 
job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
-                       }
-                       for (JobVertex vertex : jobGraph.getVertices()) {
-                               final String executableClass = 
vertex.getInvokableClassName();
-                               if (executableClass == null || 
executableClass.length() == 0) {
-                                       throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "The vertex " + vertex.getID() 
+ " (" + vertex.getName() + ") has no invokable class.");
-                               }
-                               if (vertex.getParallelism() == 
ExecutionConfig.PARALLELISM_AUTO_MAX) {
-                                       
vertex.setParallelism(scheduler.getTotalNumberOfSlots());
-                               }
-
-                               try {
-                                       
vertex.initializeOnMaster(userCodeLoader);
-                               } catch (Throwable t) {
-                                       throw new 
JobExecutionException(jobGraph.getJobID(),
-                                               "Cannot initialize task '" + 
vertex.getName() + "': " + t.getMessage(), t);
-                               }
-                       }
-
-                       // topologically sort the job vertices and attach the 
graph to the existing one
-                       final List<JobVertex> sortedTopology = 
jobGraph.getVerticesSortedTopologicallyFromSources();
-                       if (log.isDebugEnabled()) {
-                               log.debug("Adding {} vertices from job graph {} 
({}).", sortedTopology.size(),
-                                       jobGraph.getJobID(), 
jobGraph.getName());
-                       }
-                       executionGraph.attachJobGraph(sortedTopology);
-
-                       if (log.isDebugEnabled()) {
-                               log.debug("Successfully created execution graph 
from job graph {} ({}).",
-                                       jobGraph.getJobID(), 
jobGraph.getName());
-                       }
-
-                       final JobSnapshottingSettings snapshotSettings = 
jobGraph.getSnapshotSettings();
-                       if (snapshotSettings != null) {
-                               List<ExecutionJobVertex> triggerVertices = 
getExecutionJobVertexWithId(
-                                       executionGraph, 
snapshotSettings.getVerticesToTrigger());
-
-                               List<ExecutionJobVertex> ackVertices = 
getExecutionJobVertexWithId(
-                                       executionGraph, 
snapshotSettings.getVerticesToAcknowledge());
-
-                               List<ExecutionJobVertex> confirmVertices = 
getExecutionJobVertexWithId(
-                                       executionGraph, 
snapshotSettings.getVerticesToConfirm());
-
-                               CompletedCheckpointStore completedCheckpoints = 
checkpointRecoveryFactory.createCheckpointStore(
-                                       jobGraph.getJobID(), userCodeLoader);
-
-                               CheckpointIDCounter checkpointIdCounter = 
checkpointRecoveryFactory.createCheckpointIDCounter(
-                                       jobGraph.getJobID());
-
-                               // Checkpoint stats tracker
-                               boolean isStatsDisabled = 
configuration.getBoolean(
-                                       
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_DISABLE,
-                                       
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_DISABLE);
-
-                               final CheckpointStatsTracker 
checkpointStatsTracker;
-                               if (isStatsDisabled) {
-                                       checkpointStatsTracker = new 
DisabledCheckpointStatsTracker();
-                               }
-                               else {
-                                       int historySize = 
configuration.getInteger(
-                                               
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
-                                               
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE);
-                                       checkpointStatsTracker = new 
SimpleCheckpointStatsTracker(historySize, ackVertices, jobMetrics);
-                               }
-
-                               executionGraph.enableSnapshotCheckpointing(
-                                       
snapshotSettings.getCheckpointInterval(),
-                                       snapshotSettings.getCheckpointTimeout(),
-                                       
snapshotSettings.getMinPauseBetweenCheckpoints(),
-                                       
snapshotSettings.getMaxConcurrentCheckpoints(),
-                                       triggerVertices,
-                                       ackVertices,
-                                       confirmVertices,
-                                       checkpointIdCounter,
-                                       completedCheckpoints,
-                                       savepointStore,
-                                       checkpointStatsTracker);
-                       }
-
-                       // TODO: register this class to execution graph as job 
status change listeners
-
-                       // TODO: register client as job / execution status 
change listeners if they are interested
-
-                       /*
-                       TODO: decide whether we should take the savepoint 
before recovery
-
-                       if (isRecovery) {
-                               // this is a recovery of a master failure (this 
master takes over)
-                               executionGraph.restoreLatestCheckpointedState();
-                       } else {
-                               if (snapshotSettings != null) {
-                                       String savepointPath = 
snapshotSettings.getSavepointPath();
-                                       if (savepointPath != null) {
-                                               // got a savepoint
-                                               log.info("Starting job from 
savepoint {}.", savepointPath);
-
-                                               // load the savepoint as a 
checkpoint into the system
-                                               final CompletedCheckpoint 
savepoint = SavepointLoader.loadAndValidateSavepoint(
-                                                       jobGraph.getJobID(), 
executionGraph.getAllVertices(), savepointStore, savepointPath);
-                                               
executionGraph.getCheckpointCoordinator().getCheckpointStore().addCheckpoint(savepoint);
-
-                                               // Reset the checkpoint ID 
counter
-                                               long nextCheckpointId = 
savepoint.getCheckpointID() + 1;
-                                               log.info("Reset the checkpoint 
ID to " + nextCheckpointId);
-                                               
executionGraph.getCheckpointCoordinator().getCheckpointIdCounter().setCount(nextCheckpointId);
-
-                                               
executionGraph.restoreLatestCheckpointedState();
-                                       }
+                       // register self as job status change listener
+                       executionGraph.registerJobStatusListener(new 
JobStatusListener() {
+                               @Override
+                               public void jobStatusChanges(
+                                               final JobID jobId, final 
JobStatus newJobStatus, final long timestamp, final Throwable error)
+                               {
+                                       // run in rpc thread to avoid 
concurrency
+                                       runAsync(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       
jobStatusChanged(newJobStatus, timestamp, error);
+                                               }
+                                       });
                                }
-                       }
-                       */
+                       });
 
-                       // job is good to go, try to locate resource manager's 
address
+                       // job is ready to go, try to establish connection with 
resource manager
                        resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
                } catch (Throwable t) {
+
+                       // TODO - this should not result in a job failure, but 
another leader should take over
+                       // TODO - either this master should retry the 
execution, or it should relinquish leadership / terminate
+
                        log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
 
                        executionGraph.fail(t);
-                       executionGraph = null;
 
-                       final Throwable rt;
+                       final JobExecutionException rt;
                        if (t instanceof JobExecutionException) {
                                rt = (JobExecutionException) t;
-                       }
-                       else {
+                       } else {
                                rt = new 
JobExecutionException(jobGraph.getJobID(),
-                                       "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
+                                               "Failed to start job " + 
jobGraph.getJobID() + " (" + jobGraph.getName() + ")", t);
                        }
 
                        // TODO: notify client about this failure
@@ -488,34 +335,51 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                executionContext.execute(new Runnable() {
                        @Override
                        public void run() {
-                               if (executionGraph != null) {
-                                       try {
-                                               
executionGraph.scheduleForExecution(scheduler);
-                                       } catch (Throwable t) {
-                                               executionGraph.fail(t);
-                                       }
+                               try {
+                                       
executionGraph.scheduleForExecution(scheduler);
+                               } catch (Throwable t) {
+                                       executionGraph.fail(t);
                                }
                        }
                });
        }
 
        /**
-        * Suspending job, all the running tasks will be cancelled, and runtime 
data will be cleared.
+        * Suspending job, all the running tasks will be cancelled, and 
communication with other components
+        * will be disposed.
+        *
+        * <p>Mostly job is suspended because of the leadership has been 
revoked, one can be restart this job by
+        * calling the {@link #start(UUID)} method once we take the leadership 
back again.
         *
         * @param cause The reason of why this job been suspended.
         */
        @RpcMethod
-       public void suspendJob(final Throwable cause) {
+       public void suspendExecution(final Throwable cause) {
+               if (leaderSessionID == null) {
+                       log.debug("Job has already been suspended or 
shutdown.");
+                       return;
+               }
+
+               // receive no more messages until started again, should be 
called before we clear self leader id
+               ((StartStoppable) getSelf()).stop();
+
                leaderSessionID = null;
+               executionGraph.suspend(cause);
 
-               if (executionGraph != null) {
-                       executionGraph.suspend(cause);
-                       executionGraph = null;
+               // disconnect from resource manager:
+               try {
+                       resourceManagerLeaderRetriever.stop();
+               } catch (Exception e) {
+                       log.warn("Failed to stop resource manager leader 
retriever when suspending.");
                }
+               closeResourceManagerConnection();
+
+               // TODO: disconnect from all registered task managers
 
-               disposeCommunicationWithResourceManager();
        }
 
+       
//----------------------------------------------------------------------------------------------
+
        /**
         * Updates the task execution state for a given task.
         *
@@ -523,24 +387,38 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * @return Acknowledge the task execution state update
         */
        @RpcMethod
-       public Acknowledge updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) throws ExecutionGraphException {
+       public Acknowledge updateTaskExecutionState(
+                       final UUID leaderSessionID,
+                       final TaskExecutionState taskExecutionState) throws 
Exception
+       {
                if (taskExecutionState == null) {
                        throw new NullPointerException("TaskExecutionState must 
not be null.");
                }
 
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                if (executionGraph.updateState(taskExecutionState)) {
                        return Acknowledge.get();
                } else {
                        throw new ExecutionGraphException("The execution 
attempt " +
-                               taskExecutionState.getID() + " was not found.");
+                                       taskExecutionState.getID() + " was not 
found.");
                }
        }
 
        @RpcMethod
        public SerializedInputSplit requestNextInputSplit(
-               final JobVertexID vertexID,
-               final ExecutionAttemptID executionAttempt) throws Exception
+                       final UUID leaderSessionID,
+                       final JobVertexID vertexID,
+                       final ExecutionAttemptID executionAttempt) throws 
Exception
        {
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
                if (execution == null) {
                        // can happen when JobManager had already unregistered 
this execution upon on task failure,
@@ -579,7 +457,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                } catch (Exception ex) {
                        log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
                        IOException reason = new IOException("Could not 
serialize the next input split of class " +
-                               nextInputSplit.getClass() + ".", ex);
+                                       nextInputSplit.getClass() + ".", ex);
                        vertex.fail(reason);
                        throw reason;
                }
@@ -587,17 +465,31 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public PartitionState requestPartitionState(
-               final ResultPartitionID partitionId,
-               final ExecutionAttemptID taskExecutionId,
-               final IntermediateDataSetID taskResultId)
+                       final UUID leaderSessionID,
+                       final ResultPartitionID partitionId,
+                       final ExecutionAttemptID taskExecutionId,
+                       final IntermediateDataSetID taskResultId) throws 
Exception
        {
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
                final ExecutionState state = execution != null ? 
execution.getState() : null;
                return new PartitionState(taskResultId, 
partitionId.getPartitionId(), state);
        }
 
        @RpcMethod
-       public Acknowledge scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID) {
+       public Acknowledge scheduleOrUpdateConsumers(
+                       final UUID leaderSessionID,
+                       final ResultPartitionID partitionID) throws Exception
+       {
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
+
                executionGraph.scheduleOrUpdateConsumers(partitionID);
                return Acknowledge.get();
        }
@@ -609,223 +501,153 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public void acknowledgeCheckpoint(
-               JobID jobID,
-               ExecutionAttemptID executionAttemptID,
-               long checkpointID,
-               CheckpointStateHandles checkpointStateHandles,
-               long synchronousDurationMillis,
-               long asynchronousDurationMillis,
-               long bytesBufferedInAlignment,
-               long alignmentDurationNanos) {
-               throw new UnsupportedOperationException();
-       }
-
-       @RpcMethod
-       public void declineCheckpoint(
-               JobID jobID,
-               ExecutionAttemptID executionAttemptID,
-               long checkpointID,
-               long checkpointTimestamp) {
-               throw new UnsupportedOperationException();
-       }
-
-       @RpcMethod
-       public void resourceRemoved(final ResourceID resourceId, final String 
message) {
-               // TODO: remove resource from slot pool
-       }
-
-       @RpcMethod
-       public void acknowledgeCheckpoint(final AcknowledgeCheckpoint 
acknowledge) {
-               if (executionGraph != null) {
-                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-                       if (checkpointCoordinator != null) {
-                               getRpcService().execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       if 
(!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
-                                                               
log.info("Received message for non-existing checkpoint {}.",
+                       final JobID jobID,
+                       final ExecutionAttemptID executionAttemptID,
+                       final long checkpointID,
+                       final CheckpointStateHandles checkpointStateHandles,
+                       final long synchronousDurationMillis,
+                       final long asynchronousDurationMillis,
+                       final long bytesBufferedInAlignment,
+                       final long alignmentDurationNanos)
+       {
+               final AcknowledgeCheckpoint acknowledge = new 
AcknowledgeCheckpoint(
+                               jobID,
+                               executionAttemptID,
+                               checkpointID,
+                               checkpointStateHandles,
+                               synchronousDurationMillis,
+                               asynchronousDurationMillis,
+                               bytesBufferedInAlignment,
+                               alignmentDurationNanos);
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+
+               if (checkpointCoordinator != null) {
+                       getRpcService().execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               if 
(!checkpointCoordinator.receiveAcknowledgeMessage(acknowledge)) {
+                                                       log.info("Received 
message for non-existing checkpoint {}.",
                                                                        
acknowledge.getCheckpointId());
-                                                       }
-                                               } catch (Exception e) {
-                                                       log.error("Error in 
CheckpointCoordinator while processing {}", acknowledge, e);
                                                }
+                                       } catch (Exception e) {
+                                               log.error("Error in 
CheckpointCoordinator while processing {}", acknowledge, e);
                                        }
-                               });
-                       }
-                       else {
-                               log.error("Received AcknowledgeCheckpoint 
message for job {} with no CheckpointCoordinator",
-                                       jobGraph.getJobID());
-                       }
+                               }
+                       });
                } else {
-                       log.error("Received AcknowledgeCheckpoint for 
unavailable job {}", jobGraph.getJobID());
+                       log.error("Received AcknowledgeCheckpoint message for 
job {} with no CheckpointCoordinator",
+                                       jobGraph.getJobID());
                }
        }
 
        @RpcMethod
-       public void declineCheckpoint(final DeclineCheckpoint decline) {
-               if (executionGraph != null) {
-                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-                       if (checkpointCoordinator != null) {
-                               getRpcService().execute(new Runnable() {
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       if 
(!checkpointCoordinator.receiveDeclineMessage(decline)) {
-                                                               
log.info("Received message for non-existing checkpoint {}.", 
decline.getCheckpointId());
-                                                       }
-                                               } catch (Exception e) {
-                                                       log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
+       public void declineCheckpoint(
+                       final JobID jobID,
+                       final ExecutionAttemptID executionAttemptID,
+                       final long checkpointID,
+                       final long checkpointTimestamp)
+       {
+               final DeclineCheckpoint decline = new DeclineCheckpoint(
+                               jobID, executionAttemptID, checkpointID, 
checkpointTimestamp);
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+
+               if (checkpointCoordinator != null) {
+                       getRpcService().execute(new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               if 
(!checkpointCoordinator.receiveDeclineMessage(decline)) {
+                                                       log.info("Received 
message for non-existing checkpoint {}.", decline.getCheckpointId());
                                                }
+                                       } catch (Exception e) {
+                                               log.error("Error in 
CheckpointCoordinator while processing {}", decline, e);
                                        }
-                               });
-                       } else {
-                               log.error("Received DeclineCheckpoint message 
for job {} with no CheckpointCoordinator",
-                                       jobGraph.getJobID());
-                       }
+                               }
+                       });
                } else {
-                       log.error("Received AcknowledgeCheckpoint for 
unavailable job {}", jobGraph.getJobID());
+                       log.error("Received DeclineCheckpoint message for job 
{} with no CheckpointCoordinator",
+                                       jobGraph.getJobID());
                }
        }
 
        @RpcMethod
        public KvStateLocation lookupKvStateLocation(final String 
registrationName) throws Exception {
-               if (executionGraph != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Lookup key-value state for job {} 
with registration " +
+               if (log.isDebugEnabled()) {
+                       log.debug("Lookup key-value state for job {} with 
registration " +
                                        "name {}.", jobGraph.getJobID(), 
registrationName);
-                       }
+               }
 
-                       final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
-                       final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
-                       if (location != null) {
-                               return location;
-                       } else {
-                               throw new 
UnknownKvStateLocation(registrationName);
-                       }
+               final KvStateLocationRegistry registry = 
executionGraph.getKvStateLocationRegistry();
+               final KvStateLocation location = 
registry.getKvStateLocation(registrationName);
+               if (location != null) {
+                       return location;
                } else {
-                       throw new IllegalStateException("Received lookup 
KvState location request for unavailable job " +
-                               jobGraph.getJobID());
+                       throw new UnknownKvStateLocation(registrationName);
                }
        }
 
        @RpcMethod
        public void notifyKvStateRegistered(
-               final JobVertexID jobVertexId,
-               final KeyGroupRange keyGroupRange,
-               final String registrationName,
-               final KvStateID kvStateId,
-               final KvStateServerAddress kvStateServerAddress)
+                       final JobVertexID jobVertexId,
+                       final KeyGroupRange keyGroupRange,
+                       final String registrationName,
+                       final KvStateID kvStateId,
+                       final KvStateServerAddress kvStateServerAddress)
        {
-               if (executionGraph != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Key value state registered for job 
{} under name {}.",
+               if (log.isDebugEnabled()) {
+                       log.debug("Key value state registered for job {} under 
name {}.",
                                        jobGraph.getJobID(), registrationName);
-                       }
-                       try {
-                               
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-                                       jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress
-                               );
-                       } catch (Exception e) {
-                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName);
-                       }
-               } else {
-                       log.error("Received notify KvState registered request 
for unavailable job " + jobGraph.getJobID());
+               }
+
+               try {
+                       
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
+                                       jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress);
+               } catch (Exception e) {
+                       log.error("Failed to notify KvStateRegistry about 
registration {}.", registrationName);
                }
        }
 
        @RpcMethod
        public void notifyKvStateUnregistered(
-               JobVertexID jobVertexId,
-               KeyGroupRange keyGroupRange,
-               String registrationName)
+                       JobVertexID jobVertexId,
+                       KeyGroupRange keyGroupRange,
+                       String registrationName)
        {
-               if (executionGraph != null) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Key value state unregistered for job 
{} under name {}.",
+               if (log.isDebugEnabled()) {
+                       log.debug("Key value state unregistered for job {} 
under name {}.",
                                        jobGraph.getJobID(), registrationName);
-                       }
-                       try {
-                               
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-                                       jobVertexId, keyGroupRange, 
registrationName
-                               );
-                       } catch (Exception e) {
-                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName);
-                       }
-               } else {
-                       log.error("Received notify KvState unregistered request 
for unavailable job " + jobGraph.getJobID());
                }
-       }
 
-       @RpcMethod
-       public Future<TriggerSavepointResponse> triggerSavepoint() throws 
Exception {
-               if (executionGraph != null) {
-                       final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
-                       if (checkpointCoordinator != null) {
-                               try {
-                                       Future<String> savepointFuture = new 
FlinkFuture<>(
-                                               
checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
-
-                                       return savepointFuture.handleAsync(new 
BiFunction<String, Throwable, TriggerSavepointResponse>() {
-                                               @Override
-                                               public TriggerSavepointResponse 
apply(String savepointPath, Throwable throwable) {
-                                                       if (throwable == null) {
-                                                               return new 
TriggerSavepointResponse.Success(jobGraph.getJobID(), savepointPath);
-                                                       }
-                                                       else {
-                                                               return new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-                                                                       new 
Exception("Failed to complete savepoint", throwable));
-                                                       }
-                                               }
-                                       }, getMainThreadExecutor());
-
-                               } catch (Exception e) {
-                                       
FlinkCompletableFuture<TriggerSavepointResponse> future = new 
FlinkCompletableFuture<>();
-                                       future.complete(new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-                                               new Exception("Failed to 
trigger savepoint", e)));
-                                       return future;
-                               }
-                       } else {
-                               
FlinkCompletableFuture<TriggerSavepointResponse> future = new 
FlinkCompletableFuture<>();
-                               future.complete(new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-                                       new 
IllegalStateException("Checkpointing disabled. You can enable it via the 
execution " +
-                                               "environment of your job.")));
-                               return future;
-                       }
-               } else {
-                       FlinkCompletableFuture<TriggerSavepointResponse> future 
= new FlinkCompletableFuture<>();
-                       future.complete(new 
TriggerSavepointResponse.Failure(jobGraph.getJobID(),
-                               new IllegalArgumentException("Received trigger 
savepoint request for unavailable job " +
-                                       jobGraph.getJobID())));
-                       return future;
+               try {
+                       
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
+                                       jobVertexId, keyGroupRange, 
registrationName);
+               } catch (Exception e) {
+                       log.error("Failed to notify KvStateRegistry about 
registration {}.", registrationName);
                }
        }
 
        @RpcMethod
-       public DisposeSavepointResponse disposeSavepoint(final String 
savepointPath) {
-               try {
-                       log.info("Disposing savepoint at {}.", savepointPath);
-
-                       // check whether the savepoint exists
-                       savepointStore.loadSavepoint(savepointPath);
+       public Future<String> triggerSavepoint(final UUID leaderSessionID) 
throws Exception {
+               if (!this.leaderSessionID.equals(leaderSessionID)) {
+                       throw new Exception("Leader id not match, expected: " + 
this.leaderSessionID
+                                       + ", actual: " + leaderSessionID);
+               }
 
-                       savepointStore.disposeSavepoint(savepointPath);
-                       return new DisposeSavepointResponse.Success();
-               } catch (Exception e) {
-                       log.error("Failed to dispose savepoint at {}.", 
savepointPath, e);
-                       return new DisposeSavepointResponse.Failure(e);
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+               if (checkpointCoordinator != null) {
+                       return new 
FlinkFuture<>(checkpointCoordinator.triggerSavepoint(System.currentTimeMillis()));
+               } else {
+                       throw new IllegalStateException("Checkpointing 
disabled. You can enable it via the execution " +
+                                       "environment of your job.");
                }
        }
 
        @RpcMethod
        public ClassloadingProps requestClassloadingProps() throws Exception {
-               if (executionGraph != null) {
-                       return new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
+               return new 
ClassloadingProps(libraryCacheManager.getBlobServerPort(),
                                executionGraph.getRequiredJarFiles(),
                                executionGraph.getRequiredClasspaths());
-               } else {
-                       throw new Exception("Received classloading props 
request for unavailable job " + jobGraph.getJobID());
-               }
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -838,12 +660,11 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        public void run() {
                                log.error("Fatal error occurred on JobManager, 
cause: {}", cause.getMessage(), cause);
                                shutDown();
-                               jobCompletionActions.onFatalError(cause);
+                               errorHandler.onFatalError(cause);
                        }
                });
        }
 
-       // TODO - wrap this as StatusListenerMessenger's callback with rpc main 
thread
        private void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
                final JobID jobID = executionGraph.getJobID();
                final String jobName = executionGraph.getJobName();
@@ -871,36 +692,33 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        if (newJobStatus == JobStatus.FINISHED) {
                                try {
                                        final Map<String, 
SerializedValue<Object>> accumulatorResults =
-                                               
executionGraph.getAccumulatorsSerialized();
+                                                       
executionGraph.getAccumulatorsSerialized();
                                        final SerializedJobExecutionResult 
result = new SerializedJobExecutionResult(
-                                               jobID, 0, accumulatorResults // 
TODO get correct job duration
+                                                       jobID, 0, 
accumulatorResults // TODO get correct job duration
                                        );
                                        
jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
                                } catch (Exception e) {
                                        log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
                                        final JobExecutionException exception = 
new JobExecutionException(
-                                               jobID, "Failed to retrieve 
accumulator results.", e);
+                                                       jobID, "Failed to 
retrieve accumulator results.", e);
                                        // TODO should we also notify client?
                                        
jobCompletionActions.jobFailed(exception);
                                }
-                       }
-                       else if (newJobStatus == JobStatus.CANCELED) {
+                       } else if (newJobStatus == JobStatus.CANCELED) {
                                final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
                                final JobExecutionException exception = new 
JobExecutionException(
-                                       jobID, "Job was cancelled.", 
unpackedError);
+                                               jobID, "Job was cancelled.", 
unpackedError);
                                // TODO should we also notify client?
                                jobCompletionActions.jobFailed(exception);
-                       }
-                       else if (newJobStatus == JobStatus.FAILED) {
+                       } else if (newJobStatus == JobStatus.FAILED) {
                                final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
                                final JobExecutionException exception = new 
JobExecutionException(
-                                       jobID, "Job execution failed.", 
unpackedError);
+                                               jobID, "Job execution failed.", 
unpackedError);
                                // TODO should we also notify client?
                                jobCompletionActions.jobFailed(exception);
-                       }
-                       else {
+                       } else {
                                final JobExecutionException exception = new 
JobExecutionException(
-                                       jobID, newJobStatus + " is not a 
terminal state.");
+                                               jobID, newJobStatus + " is not 
a terminal state.");
                                // TODO should we also notify client?
                                jobCompletionActions.jobFailed(exception);
                                throw new RuntimeException(exception);
@@ -909,7 +727,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        private void notifyOfNewResourceManagerLeader(
-               final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
+                       final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
        {
                // IMPORTANT: executed by main thread to avoid concurrence
                runAsync(new Runnable() {
@@ -918,17 +736,15 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                if (resourceManagerConnection != null) {
                                        if (resourceManagerAddress != null) {
                                                if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-                                                       && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId()))
-                                               {
+                                                               && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
                                                        // both address and 
leader id are not changed, we can keep the old connection
                                                        return;
                                                }
                                                log.info("ResourceManager 
leader changed from {} to {}. Registering at new leader.",
-                                                       
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
-                                       }
-                                       else {
+                                                               
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+                                       } else {
                                                log.info("Current 
ResourceManager {} lost leader status. Waiting for new ResourceManager leader.",
-                                                       
resourceManagerConnection.getTargetAddress());
+                                                               
resourceManagerConnection.getTargetAddress());
                                        }
                                }
 
@@ -937,8 +753,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                if (resourceManagerAddress != null) {
                                        log.info("Attempting to register at 
ResourceManager {}", resourceManagerAddress);
                                        resourceManagerConnection = new 
ResourceManagerConnection(
-                                               log, jobGraph.getJobID(), 
leaderSessionID,
-                                               resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
+                                                       log, 
jobGraph.getJobID(), leaderSessionID,
+                                                       resourceManagerAddress, 
resourceManagerLeaderId, executionContext);
                                        resourceManagerConnection.start();
                                }
                        }
@@ -952,26 +768,14 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                // TODO - add tests for comment in 
https://github.com/apache/flink/pull/2565
                                // verify the response with current connection
                                if (resourceManagerConnection != null
-                                       && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
 {
+                                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
 {
                                        log.info("JobManager successfully 
registered at ResourceManager, leader id: {}.",
-                                               
success.getResourceManagerLeaderId());
+                                                       
success.getResourceManagerLeaderId());
                                }
                        }
                });
        }
 
-       private void disposeCommunicationWithResourceManager() {
-               // 1. stop the leader retriever so we will not receiving 
updates anymore
-               try {
-                       resourceManagerLeaderRetriever.stop();
-               } catch (Exception e) {
-                       log.warn("Failed to stop resource manager leader 
retriever.");
-               }
-
-               // 2. close current connection with ResourceManager if exists
-               closeResourceManagerConnection();
-       }
-
        private void closeResourceManagerConnection() {
                if (resourceManagerConnection != null) {
                        resourceManagerConnection.close();
@@ -980,34 +784,6 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        
//----------------------------------------------------------------------------------------------
-       // Helper methods
-       
//----------------------------------------------------------------------------------------------
-
-       /**
-        * Converts JobVertexIDs to corresponding ExecutionJobVertexes
-        *
-        * @param executionGraph The execution graph that holds the relationship
-        * @param vertexIDs      The vertexIDs need to be converted
-        * @return The corresponding ExecutionJobVertexes
-        * @throws JobExecutionException
-        */
-       private static List<ExecutionJobVertex> getExecutionJobVertexWithId(
-               final ExecutionGraph executionGraph, final List<JobVertexID> 
vertexIDs)
-               throws JobExecutionException
-       {
-               final List<ExecutionJobVertex> ret = new 
ArrayList<>(vertexIDs.size());
-               for (JobVertexID vertexID : vertexIDs) {
-                       final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(vertexID);
-                       if (executionJobVertex == null) {
-                               throw new 
JobExecutionException(executionGraph.getJobID(),
-                                       "The snapshot checkpointing settings 
refer to non-existent vertex " + vertexID);
-                       }
-                       ret.add(executionJobVertex);
-               }
-               return ret;
-       }
-
-       
//----------------------------------------------------------------------------------------------
        // Utility classes
        
//----------------------------------------------------------------------------------------------
 
@@ -1024,19 +800,19 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        private class ResourceManagerConnection
-               extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess>
+                       extends RegisteredRpcConnection<ResourceManagerGateway, 
JobMasterRegistrationSuccess>
        {
                private final JobID jobID;
 
                private final UUID jobManagerLeaderID;
 
                ResourceManagerConnection(
-                       final Logger log,
-                       final JobID jobID,
-                       final UUID jobManagerLeaderID,
-                       final String resourceManagerAddress,
-                       final UUID resourceManagerLeaderID,
-                       final Executor executor)
+                               final Logger log,
+                               final JobID jobID,
+                               final UUID jobManagerLeaderID,
+                               final String resourceManagerAddress,
+                               final UUID resourceManagerLeaderID,
+                               final Executor executor)
                {
                        super(log, resourceManagerAddress, 
resourceManagerLeaderID, executor);
                        this.jobID = checkNotNull(jobID);
@@ -1046,12 +822,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                @Override
                protected RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess> generateRegistration() {
                        return new RetryingRegistration<ResourceManagerGateway, 
JobMasterRegistrationSuccess>(
-                               log, getRpcService(), "ResourceManager", 
ResourceManagerGateway.class,
-                               getTargetAddress(), getTargetLeaderId())
+                                       log, getRpcService(), 
"ResourceManager", ResourceManagerGateway.class,
+                                       getTargetAddress(), getTargetLeaderId())
                        {
                                @Override
                                protected Future<RegistrationResponse> 
invokeRegistration(ResourceManagerGateway gateway, UUID leaderId,
-                                       long timeoutMillis) throws Exception
+                                               long timeoutMillis) throws 
Exception
                                {
                                        Time timeout = 
Time.milliseconds(timeoutMillis);
                                        return 
gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, 
timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 4b51258..fbe3f74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -29,15 +31,11 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
-import org.apache.flink.runtime.jobmaster.message.DisposeSavepointResponse;
-import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
-import org.apache.flink.runtime.jobmaster.message.TriggerSavepointResponse;
-import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -49,52 +47,56 @@ import java.util.UUID;
  */
 public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
-       /**
-        * Starting the job under the given leader session ID.
-        */
-       void startJob(final UUID leaderSessionID);
+       // 
------------------------------------------------------------------------
+       //  Job start and stop methods
+       // 
------------------------------------------------------------------------
 
-       /**
-        * Suspending job, all the running tasks will be cancelled, and runtime 
status will be cleared.
-        * Should re-submit the job before restarting it.
-        *
-        * @param cause The reason of why this job been suspended.
-        */
-       void suspendJob(final Throwable cause);
+       void startJobExecution();
+
+       void suspendExecution(Throwable cause);
+
+       // 
------------------------------------------------------------------------
 
        /**
         * Updates the task execution state for a given task.
         *
+        * @param leaderSessionID    The leader id of JobManager
         * @param taskExecutionState New task execution state for a given task
         * @return Future flag of the task execution state update result
         */
-       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+       Future<Acknowledge> updateTaskExecutionState(
+                       final UUID leaderSessionID,
+                       final TaskExecutionState taskExecutionState);
 
        /**
         * Requesting next input split for the {@link ExecutionJobVertex}. The 
next input split is sent back to the sender
         * as a {@link SerializedInputSplit} message.
         *
+        * @param leaderSessionID  The leader id of JobManager
         * @param vertexID         The job vertex id
         * @param executionAttempt The execution attempt id
         * @return The future of the input split. If there is no further input 
split, will return an empty object.
         */
        Future<SerializedInputSplit> requestNextInputSplit(
-               final JobVertexID vertexID,
-               final ExecutionAttemptID executionAttempt);
+                       final UUID leaderSessionID,
+                       final JobVertexID vertexID,
+                       final ExecutionAttemptID executionAttempt);
 
        /**
         * Requests the current state of the partition.
         * The state of a partition is currently bound to the state of the 
producing execution.
         *
+        * @param leaderSessionID The leader id of JobManager
         * @param partitionId     The partition ID of the partition to request 
the state of.
         * @param taskExecutionId The execution attempt ID of the task 
requesting the partition state.
         * @param taskResultId    The input gate ID of the task requesting the 
partition state.
         * @return The future of the partition state
         */
        Future<PartitionState> requestPartitionState(
-               final ResultPartitionID partitionId,
-               final ExecutionAttemptID taskExecutionId,
-               final IntermediateDataSetID taskResultId);
+                       final UUID leaderSessionID,
+                       final ResultPartitionID partitionId,
+                       final ExecutionAttemptID taskExecutionId,
+                       final IntermediateDataSetID taskResultId);
 
        /**
         * Notifies the JobManager about available data for a produced 
partition.
@@ -105,11 +107,15 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * <p>
         * The JobManager then can decide when to schedule the partition 
consumers of the given session.
         *
-        * @param partitionID The partition which has already produced data
-        * @param timeout before the rpc call fails
+        * @param leaderSessionID The leader id of JobManager
+        * @param partitionID     The partition which has already produced data
+        * @param timeout         before the rpc call fails
         * @return Future acknowledge of the schedule or update operation
         */
-       Future<Acknowledge> scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID, @RpcTimeout final Time timeout);
+       Future<Acknowledge> scheduleOrUpdateConsumers(
+                       final UUID leaderSessionID,
+                       final ResultPartitionID partitionID,
+                       @RpcTimeout final Time timeout);
 
        /**
         * Disconnects the given {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} from the
@@ -118,31 +124,6 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param resourceID identifying the TaskManager to disconnect
         */
        void disconnectTaskManager(ResourceID resourceID);
-       void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
-
-       /**
-        * Notifies the JobManager about the removal of a resource.
-        *
-        * @param resourceId The ID under which the resource is registered.
-        * @param message    Optional message with details, for logging and 
debugging.
-        */
-
-       void resourceRemoved(final ResourceID resourceId, final String message);
-
-       /**
-        * Notifies the JobManager that the checkpoint of an individual task is 
completed.
-        *
-        * @param acknowledge The acknowledge message of the checkpoint
-        */
-       void acknowledgeCheckpoint(final AcknowledgeCheckpoint acknowledge);
-
-       /**
-        * Notifies the JobManager that a checkpoint request could not be 
heeded.
-        * This can happen if a Task is already in RUNNING state but is 
internally not yet ready to perform checkpoints.
-        *
-        * @param decline The decline message of the checkpoint
-        */
-       void declineCheckpoint(final DeclineCheckpoint decline);
 
        /**
         * Requests a {@link KvStateLocation} for the specified {@link KvState} 
registration name.
@@ -150,7 +131,7 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param registrationName Name under which the KvState has been 
registered.
         * @return Future of the requested {@link KvState} location
         */
-       Future<KvStateLocation> lookupKvStateLocation(final String 
registrationName) throws Exception;
+       Future<KvStateLocation> lookupKvStateLocation(final String 
registrationName);
 
        /**
         * @param jobVertexId          JobVertexID the KvState instance belongs 
to.
@@ -160,11 +141,11 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param kvStateServerAddress Server address where to find the KvState 
instance.
         */
        void notifyKvStateRegistered(
-               final JobVertexID jobVertexId,
-               final KeyGroupRange keyGroupRange,
-               final String registrationName,
-               final KvStateID kvStateId,
-               final KvStateServerAddress kvStateServerAddress);
+                       final JobVertexID jobVertexId,
+                       final KeyGroupRange keyGroupRange,
+                       final String registrationName,
+                       final KvStateID kvStateId,
+                       final KvStateServerAddress kvStateServerAddress);
 
        /**
         * @param jobVertexId      JobVertexID the KvState instance belongs to.
@@ -172,24 +153,17 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param registrationName Name under which the KvState has been 
registered.
         */
        void notifyKvStateUnregistered(
-               JobVertexID jobVertexId,
-               KeyGroupRange keyGroupRange,
-               String registrationName);
+                       JobVertexID jobVertexId,
+                       KeyGroupRange keyGroupRange,
+                       String registrationName);
 
        /**
         * Notifies the JobManager to trigger a savepoint for this job.
         *
-        * @return Future of the savepoint trigger response.
-        */
-       Future<TriggerSavepointResponse> triggerSavepoint();
-
-       /**
-        * Notifies the Jobmanager to dispose specified savepoint.
-        *
-        * @param savepointPath The path of the savepoint.
-        * @return The future of the savepoint disponse response.
+        * @param leaderSessionID The leader id of JobManager
+        * @return The savepoint path
         */
-       Future<DisposeSavepointResponse> disposeSavepoint(final String 
savepointPath);
+       Future<String> triggerSavepoint(final UUID leaderSessionID);
 
        /**
         * Request the classloading props of this job.

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
index e8fb5bb..019ccfe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/MiniClusterJobDispatcher.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import org.slf4j.Logger;
@@ -62,6 +64,9 @@ public class MiniClusterJobDispatcher {
        /** al the services that the JobManager needs, such as BLOB service, 
factories, etc */
        private final JobManagerServices jobManagerServices;
 
+       /** Registry for all metrics in the mini cluster */
+       private final MetricRegistry metricRegistry;
+
        /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
        private final int numJobManagers;
 
@@ -86,8 +91,9 @@ public class MiniClusterJobDispatcher {
        public MiniClusterJobDispatcher(
                        Configuration config,
                        RpcService rpcService,
-                       HighAvailabilityServices haServices) throws Exception {
-               this(config, rpcService, haServices, 1);
+                       HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry) throws Exception {
+               this(config, rpcService, haServices, metricRegistry, 1);
        }
 
        /**
@@ -106,16 +112,18 @@ public class MiniClusterJobDispatcher {
                        Configuration config,
                        RpcService rpcService,
                        HighAvailabilityServices haServices,
+                       MetricRegistry metricRegistry,
                        int numJobManagers) throws Exception {
 
                checkArgument(numJobManagers >= 1);
                this.configuration = checkNotNull(config);
                this.rpcService = checkNotNull(rpcService);
                this.haServices = checkNotNull(haServices);
+               this.metricRegistry = checkNotNull(metricRegistry);
                this.numJobManagers = numJobManagers;
 
                LOG.info("Creating JobMaster services");
-               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config);
+               this.jobManagerServices = 
JobManagerServices.fromConfiguration(config, haServices);
        }
 
        // 
------------------------------------------------------------------------
@@ -140,9 +148,8 @@ public class MiniClusterJobDispatcher {
                                if (runners != null) {
                                        this.runners = null;
 
-                                       Exception shutdownException = new 
Exception("The MiniCluster is shutting down");
                                        for (JobManagerRunner runner : runners) 
{
-                                               
runner.shutdown(shutdownException);
+                                               runner.shutdown();
                                        }
                                }
                        }
@@ -171,9 +178,9 @@ public class MiniClusterJobDispatcher {
                        checkState(!shutdown, "mini cluster is shut down");
                        checkState(runners == null, "mini cluster can only 
execute one job at a time");
 
-                       OnCompletionActions onJobCompletion = new 
DetachedFinalizer(numJobManagers);
+                       DetachedFinalizer finalizer = new 
DetachedFinalizer(numJobManagers);
 
-                       this.runners = startJobRunners(job, onJobCompletion);
+                       this.runners = startJobRunners(job, finalizer, 
finalizer);
                }
        }
 
@@ -191,17 +198,17 @@ public class MiniClusterJobDispatcher {
                checkNotNull(job);
                
                LOG.info("Received job for blocking execution {} ({})", 
job.getName(), job.getJobID());
-               final BlockingJobSync onJobCompletion = new 
BlockingJobSync(job.getJobID(), numJobManagers);
+               final BlockingJobSync sync = new 
BlockingJobSync(job.getJobID(), numJobManagers);
 
                synchronized (lock) {
                        checkState(!shutdown, "mini cluster is shut down");
                        checkState(runners == null, "mini cluster can only 
execute one job at a time");
 
-                       this.runners = startJobRunners(job, onJobCompletion);
+                       this.runners = startJobRunners(job, sync, sync);
                }
 
                try {
-                       return onJobCompletion.getResult();
+                       return sync.getResult();
                }
                finally {
                        // always clear the status for the next job
@@ -209,24 +216,26 @@ public class MiniClusterJobDispatcher {
                }
        }
 
-       private JobManagerRunner[] startJobRunners(JobGraph job, 
OnCompletionActions onCompletion) throws JobExecutionException {
+       private JobManagerRunner[] startJobRunners(
+                       JobGraph job,
+                       OnCompletionActions onCompletion,
+                       FatalErrorHandler errorHandler) throws 
JobExecutionException {
                LOG.info("Starting {} JobMaster(s) for job {} ({})", 
numJobManagers, job.getName(), job.getJobID());
 
                JobManagerRunner[] runners = new 
JobManagerRunner[numJobManagers];
                for (int i = 0; i < numJobManagers; i++) {
                        try {
                                runners[i] = new JobManagerRunner(job, 
configuration,
-                                               rpcService, haServices, 
jobManagerServices, onCompletion);
+                                               rpcService, haServices, 
jobManagerServices, metricRegistry, 
+                                               onCompletion, errorHandler);
                                runners[i].start();
                        }
                        catch (Throwable t) {
                                // shut down all the ones so far
-                               Exception shutdownCause = new Exception("Failed 
to properly start all mini cluster JobManagers", t);
-
                                for (int k = 0; k <= i; k++) {
                                        try {
                                                if (runners[i] != null) {
-                                                       
runners[i].shutdown(shutdownCause);
+                                                       runners[i].shutdown();
                                                }
                                        } catch (Throwable ignored) {
                                                // silent shutdown
@@ -244,15 +253,15 @@ public class MiniClusterJobDispatcher {
        //  test methods to simulate job master failures
        // 
------------------------------------------------------------------------
 
-       public void killJobMaster(int which) {
-               checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
-               checkState(!shutdown, "mini cluster is shut down");
-
-               JobManagerRunner[] runners = this.runners;
-               checkState(runners != null, "mini cluster it not executing a 
job right now");
-
-               runners[which].shutdown(new Throwable("kill JobManager"));
-       }
+//     public void killJobMaster(int which) {
+//             checkArgument(which >= 0 && which < numJobManagers, "no such 
job master");
+//             checkState(!shutdown, "mini cluster is shut down");
+//
+//             JobManagerRunner[] runners = this.runners;
+//             checkState(runners != null, "mini cluster it not executing a 
job right now");
+//
+//             runners[which].shutdown(new Throwable("kill JobManager"));
+//     }
 
        // 
------------------------------------------------------------------------
        //  utility classes
@@ -263,7 +272,7 @@ public class MiniClusterJobDispatcher {
         * In the case of a high-availability test setup, there may be multiple 
runners.
         * After that, it marks the mini cluster as ready to receive new jobs.
         */
-       private class DetachedFinalizer implements OnCompletionActions {
+       private class DetachedFinalizer implements OnCompletionActions, 
FatalErrorHandler {
 
                private final AtomicInteger numJobManagersToWaitFor;
 
@@ -308,7 +317,7 @@ public class MiniClusterJobDispatcher {
         * That way it is guaranteed that after the blocking job submit call 
returns,
         * the dispatcher is immediately free to accept another job.
         */
-       private static class BlockingJobSync implements OnCompletionActions {
+       private static class BlockingJobSync implements OnCompletionActions, 
FatalErrorHandler {
 
                private final JobID jobId;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
deleted file mode 100644
index 42bfc71..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/DisposeSavepointResponse.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import java.io.Serializable;
-
-/**
- * The response of the dispose savepoint request to JobManager.
- */
-public abstract class DisposeSavepointResponse implements Serializable {
-
-       private static final long serialVersionUID = 6008792963949369567L;
-
-       public static class Success extends DisposeSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
1572462960008711415L;
-       }
-
-       public static class Failure extends DisposeSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
-7505308325483022458L;
-
-               private final Throwable cause;
-
-               public Failure(final Throwable cause) {
-                       this.cause = cause;
-               }
-
-               public Throwable getCause() {
-                       return cause;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
deleted file mode 100644
index 0b0edc5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/TriggerSavepointResponse.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmaster.message;
-
-import org.apache.flink.api.common.JobID;
-
-import java.io.Serializable;
-
-/**
- * The response of the trigger savepoint request to JobManager.
- */
-public abstract class TriggerSavepointResponse implements Serializable {
-
-       private static final long serialVersionUID = 3139327824611807707L;
-
-       private final JobID jobID;
-
-       public JobID getJobID() {
-               return jobID;
-       }
-
-       public TriggerSavepointResponse(final JobID jobID) {
-               this.jobID = jobID;
-       }
-
-       public static class Success extends TriggerSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
-1100637460388881776L;
-
-               private final String savepointPath;
-
-               public Success(final JobID jobID, final String savepointPath) {
-                       super(jobID);
-                       this.savepointPath = savepointPath;
-               }
-
-               public String getSavepointPath() {
-                       return savepointPath;
-               }
-       }
-
-       public static class Failure extends TriggerSavepointResponse implements 
Serializable {
-
-               private static final long serialVersionUID = 
-1668479003490615139L;
-
-               private final Throwable cause;
-
-               public Failure(final JobID jobID, final Throwable cause) {
-                       super(jobID);
-                       this.cause = cause;
-               }
-
-               public Throwable getCause() {
-                       return cause;
-               }
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 2052f98..4b9100a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
        /**
-        * Return the address under which the rpc service can be reached. If 
the rpc service cannot be
-        * contacted remotely, then it will return an empty string.
+        * Return the hostname or host address under which the rpc service can 
be reached.
+        * If the rpc service cannot be contacted remotely, then it will return 
an empty string.
         *
         * @return Address of the rpc service or empty string if local rpc 
service
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/34fef475/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index ef62ef1..6fcd082 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -26,11 +26,16 @@ import 
org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.util.Preconditions;
 
+import java.util.UUID;
+
 /**
  * Container class for JobManager specific communication utils used by the 
{@link TaskExecutor}.
  */
 public class JobManagerConnection {
 
+       // Job master leader session id
+       private final UUID jobMasterLeaderId;
+
        // Gateway to the job master
        private final JobMasterGateway jobMasterGateway;
 
@@ -50,13 +55,15 @@ public class JobManagerConnection {
        private final PartitionStateChecker partitionStateChecker;
 
        public JobManagerConnection(
-               JobMasterGateway jobMasterGateway,
-               TaskManagerActions taskManagerActions,
-               CheckpointResponder checkpointResponder,
-               LibraryCacheManager libraryCacheManager,
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
-               PartitionStateChecker partitionStateChecker) {
-
+                       UUID jobMasterLeaderId,
+                       JobMasterGateway jobMasterGateway,
+                       TaskManagerActions taskManagerActions,
+                       CheckpointResponder checkpointResponder,
+                       LibraryCacheManager libraryCacheManager,
+                       ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
+                       PartitionStateChecker partitionStateChecker)
+       {
+               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                this.taskManagerActions = 
Preconditions.checkNotNull(taskManagerActions);
                this.checkpointResponder = 
Preconditions.checkNotNull(checkpointResponder);
@@ -65,6 +72,10 @@ public class JobManagerConnection {
                this.partitionStateChecker = 
Preconditions.checkNotNull(partitionStateChecker);
        }
 
+       public UUID getJobMasterLeaderId() {
+               return jobMasterLeaderId;
+       }
+
        public JobMasterGateway getJobManagerGateway() {
                return jobMasterGateway;
        }

Reply via email to