[FLINK-4657] Implement HighAvailabilityServices based on ZooKeeper

[FLINK-4657] Implement a few rpc calls for JobMaster

[FLINK-4657][cluster management] Address review comments

[FLINK-4657][cluster management] Throw exception when error occurred when 
request input split


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/214113eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/214113eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/214113eb

Branch: refs/heads/flip-6
Commit: 214113eb494d83046e6b0b2dc4df49fc72d869f8
Parents: 291daf6
Author: Kurt Young <ykt...@gmail.com>
Authored: Mon Sep 26 10:59:16 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   7 +-
 .../runtime/highavailability/NonHaServices.java |   4 +-
 .../highavailability/ZookeeperHaServices.java   |  82 ++++++++++
 .../StandaloneSubmittedJobGraphStore.java       |   5 +
 .../jobmanager/SubmittedJobGraphStore.java      |   8 +
 .../ZooKeeperSubmittedJobGraphStore.java        |   7 +
 .../runtime/jobmaster/JobManagerRunner.java     |  18 +--
 .../flink/runtime/jobmaster/JobMaster.java      | 161 ++++++++++++++++++-
 .../runtime/jobmaster/JobMasterGateway.java     |  54 ++++++-
 .../jobmaster/message/NextInputSplit.java       |  39 +++++
 .../resourcemanager/ResourceManager.java        |   6 +-
 .../flink/runtime/util/ZooKeeperUtils.java      |  82 ++++++++--
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../TestingHighAvailabilityServices.java        |  20 +--
 .../jobmanager/JobManagerHARecoveryTest.java    |   3 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |  11 +-
 .../slotmanager/SlotProtocolTest.java           |   2 +-
 17 files changed, 455 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index d67e927..a26886a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
  *     <li>JobManager leader election and leader retrieval</li>
  *     <li>Persistence for checkpoint metadata</li>
  *     <li>Registering the latest completed checkpoint(s)</li>
+ *     <li>Persistence for submitted job graph</li>
  * </ul>
  */
 public interface HighAvailabilityServices {
@@ -48,12 +49,10 @@ public interface HighAvailabilityServices {
         * @return
         * @throws Exception
         */
-       LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws 
Exception;
+       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws 
Exception;
 
        /**
         * Gets the leader election service for the cluster's resource manager.
-        * @return
-        * @throws Exception
         */
        LeaderElectionService getResourceManagerLeaderElectionService() throws 
Exception;
 
@@ -62,7 +61,7 @@ public interface HighAvailabilityServices {
         *
         * @param jobID The identifier of the job running the election.
         */
-       LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) 
throws Exception;
+       LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) 
throws Exception;
 
        /**
         * Gets the checkpoint recovery factory for the job manager

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index a2c9cc4..2c6295c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -79,7 +79,7 @@ public class NonHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
                return new 
StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
        }
 
@@ -89,7 +89,7 @@ public class NonHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public LeaderElectionService getJobMasterLeaderElectionService(JobID 
jobID) throws Exception {
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
                return new StandaloneLeaderElectionService();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
new file mode 100644
index 0000000..d26b668
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.highavailability;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointRecoveryFactory;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+
+/**
+ * An implementation of the {@link HighAvailabilityServices} with zookeeper.
+ */
+public class ZookeeperHaServices implements HighAvailabilityServices {
+
+       private static final String DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX = 
"/resource-manager";
+
+       /** The ZooKeeper client to use */
+       private final CuratorFramework client;
+
+       /** The runtime configuration */
+       private final Configuration configuration;
+
+       public ZookeeperHaServices(final CuratorFramework client, final 
Configuration configuration) {
+               this.client = client;
+               this.configuration = configuration;
+       }
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
+               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathSuffixForJob(jobID));
+       }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, DEFAULT_RESOURCE_MANAGER_PATH_SUFFIX);
+       }
+
+       @Override
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
+               return ZooKeeperUtils.createLeaderElectionService(client, 
configuration, getPathSuffixForJob(jobID));
+       }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() throws 
Exception {
+               return new ZooKeeperCheckpointRecoveryFactory(client, 
configuration);
+       }
+
+       @Override
+       public SubmittedJobGraphStore getSubmittedJobGraphStore() throws 
Exception {
+               return ZooKeeperUtils.createSubmittedJobGraphs(client, 
configuration);
+       }
+
+       private static String getPathSuffixForJob(final JobID jobID) {
+               return String.format("/job-managers/%s", jobID);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
index 3041cde..00df935 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/StandaloneSubmittedJobGraphStore.java
@@ -62,4 +62,9 @@ public class StandaloneSubmittedJobGraphStore implements 
SubmittedJobGraphStore
        public List<SubmittedJobGraph> recoverJobGraphs() throws Exception {
                return Collections.emptyList();
        }
+
+       @Override
+       public boolean contains(JobID jobId) throws Exception {
+               return false;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
index bd628cd..4d544ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/SubmittedJobGraphStore.java
@@ -64,6 +64,14 @@ public interface SubmittedJobGraphStore {
        void removeJobGraph(JobID jobId) throws Exception;
 
        /**
+        * Check whether the given {@link JobID} is exist.
+        *
+        * <p>It's also a flag indicates whether we should recover this job 
before we can do anything else, since all
+        * global terminated job will be removed from this store.
+        */
+       boolean contains(final JobID jobId) throws Exception;
+
+       /**
         * A listener for {@link SubmittedJobGraph} instances. This is used to 
react to races between
         * multiple running {@link SubmittedJobGraphStore} instances (on 
multiple job managers).
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index ec05f1e..92093c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -266,6 +266,13 @@ public class ZooKeeperSubmittedJobGraphStore implements 
SubmittedJobGraphStore {
                }
        }
 
+       @Override
+       public boolean contains(JobID jobId) throws Exception {
+               checkNotNull(jobId, "Job ID");
+               String path = getPathForJob(jobId);
+               return jobGraphsInZooKeeper.exists(path) != -1;
+       }
+
        /**
         * Monitors ZooKeeper for changes.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 6944d85..a096932 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -21,20 +21,18 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.RpcService;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 /**
  * The runner for the job manager. It deals with job level leader election and 
make underlying job manager
@@ -52,11 +50,8 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
 
        private final OnCompletionActions toNotify;
 
-       /** The execution context which is used to execute futures */
-       private final Executor executionContext;
-
-       // TODO: use this to decide whether the job is finished by other
-       private final CheckpointRecoveryFactory checkpointRecoveryFactory;
+       /** Used to check whether a job needs to be run */
+       private final SubmittedJobGraphStore submittedJobGraphStore;
 
        /** Leader election for this job */
        private final LeaderElectionService leaderElectionService;
@@ -87,9 +82,8 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
        {
                this.jobGraph = jobGraph;
                this.toNotify = toNotify;
-               this.executionContext = rpcService.getExecutor();
-               this.checkpointRecoveryFactory = 
haServices.getCheckpointRecoveryFactory();
-               this.leaderElectionService = 
haServices.getJobMasterLeaderElectionService(jobGraph.getJobID());
+               this.submittedJobGraphStore = 
haServices.getSubmittedJobGraphStore();
+               this.leaderElectionService = 
haServices.getJobManagerLeaderElectionService(jobGraph.getJobID());
 
                this.jobManager = new JobMaster(
                        jobGraph, configuration, rpcService, haServices,
@@ -271,7 +265,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions {
 
        @VisibleForTesting
        boolean isJobFinishedByOthers() {
-               // TODO
+               // TODO: Fix
                return false;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1e01c55..e67a167 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -35,23 +37,32 @@ import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
@@ -61,13 +72,18 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.util.SerializedThrowable;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 
@@ -491,9 +507,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         * @return Acknowledge the task execution state update
         */
        @RpcMethod
-       public Acknowledge updateTaskExecutionState(TaskExecutionState 
taskExecutionState) {
-               System.out.println("TaskExecutionState: " + taskExecutionState);
-               return Acknowledge.get();
+       public boolean updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
+               if (taskExecutionState == null) {
+                       return false;
+               } else {
+                       return executionGraph.updateState(taskExecutionState);
+               }
        }
 
        
//----------------------------------------------------------------------------------------------

@@ -511,6 +530,140 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                });
        }
 
+       @RpcMethod
+       public NextInputSplit requestNextInputSplit(
+               final JobVertexID vertexID,
+               final ExecutionAttemptID executionAttempt) throws Exception
+       {
+               final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
+               if (execution == null) {
+                       // can happen when JobManager had already unregistered 
this execution upon on task failure,
+                       // but TaskManager get some delay to aware of that 
situation
+                       if (log.isDebugEnabled()) {
+                               log.debug("Can not find Execution for attempt 
{}.", executionAttempt);
+                       }
+                       // but we should TaskManager be aware of this
+                       throw new Exception("Can not find Execution for attempt 
" + executionAttempt);
+               }
+
+               final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
+               if (vertex == null) {
+                       log.error("Cannot find execution vertex for vertex ID 
{}.", vertexID);
+                       throw new Exception("Cannot find execution vertex for 
vertex ID " + vertexID);
+               }
+
+               final InputSplitAssigner splitAssigner = 
vertex.getSplitAssigner();
+               if (splitAssigner == null) {
+                       log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID);
+                       throw new Exception("No InputSplitAssigner for vertex 
ID " + vertexID);
+               }
+
+               final Slot slot = execution.getAssignedResource();
+               final int taskId = 
execution.getVertex().getParallelSubtaskIndex();
+               final String host = slot != null ? 
slot.getTaskManagerLocation().getHostname() : null;
+               final InputSplit nextInputSplit = 
splitAssigner.getNextInputSplit(host, taskId);
+
+               if (log.isDebugEnabled()) {
+                       log.debug("Send next input split {}.", nextInputSplit);
+               }
+
+               try {
+                       final byte[] serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
+                       return new NextInputSplit(serializedInputSplit);
+               } catch (Exception ex) {
+                       log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
+                       IOException reason = new IOException("Could not 
serialize the next input split of class " +
+                               nextInputSplit.getClass() + ".", ex);
+                       vertex.fail(reason);
+                       throw reason;
+               }
+       }
+
+       @RpcMethod
+       public PartitionState requestPartitionState(
+               final ResultPartitionID partitionId,
+               final ExecutionAttemptID taskExecutionId,
+               final IntermediateDataSetID taskResultId)
+       {
+               final Execution execution = 
executionGraph.getRegisteredExecutions().get(partitionId.getProducerId());
+               final ExecutionState state = execution != null ? 
execution.getState() : null;
+               return new PartitionState(taskResultId, 
partitionId.getPartitionId(), state);
+       }
+
+       @RpcMethod
+       public void scheduleOrUpdateConsumers(final ResultPartitionID 
partitionID) {
+               executionGraph.scheduleOrUpdateConsumers(partitionID);
+       }
+
+       
//----------------------------------------------------------------------------------------------
+       // Internal methods
+       
//----------------------------------------------------------------------------------------------
+
+       // TODO - wrap this as StatusListenerMessenger's callback with rpc main 
thread
+       private void jobStatusChanged(final JobStatus newJobStatus, long 
timestamp, final Throwable error) {
+               final JobID jobID = executionGraph.getJobID();
+               final String jobName = executionGraph.getJobName();
+               log.info("Status of job {} ({}) changed to {}.", jobID, 
jobName, newJobStatus, error);
+
+               if (newJobStatus.isGloballyTerminalState()) {
+                       // TODO set job end time in JobInfo
+
+                       /*
+                         TODO
+                         if (jobInfo.sessionAlive) {
+                jobInfo.setLastActive()
+                val lastActivity = jobInfo.lastActive
+                context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout 
seconds) {
+                  // remove only if no activity occurred in the meantime
+                  if (lastActivity == jobInfo.lastActive) {
+                    self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+                  }
+                }(context.dispatcher)
+              } else {
+                self ! decorateMessage(RemoveJob(jobID, 
removeJobFromStateBackend = true))
+              }
+                        */
+
+                       if (newJobStatus == JobStatus.FINISHED) {
+                               try {
+                                       final Map<String, 
SerializedValue<Object>> accumulatorResults =
+                                               
executionGraph.getAccumulatorsSerialized();
+                                       final SerializedJobExecutionResult 
result = new SerializedJobExecutionResult(
+                                               jobID, 0, accumulatorResults // 
TODO get correct job duration
+                                       );
+                                       
jobCompletionActions.jobFinished(result.toJobExecutionResult(userCodeLoader));
+                               } catch (Exception e) {
+                                       log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
+                                       final JobExecutionException exception = 
new JobExecutionException(
+                                               jobID, "Failed to retrieve 
accumulator results.", e);
+                                       // TODO should we also notify client?
+                                       
jobCompletionActions.jobFailed(exception);
+                               }
+                       }
+                       else if (newJobStatus == JobStatus.CANCELED) {
+                               final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
+                               final JobExecutionException exception = new 
JobExecutionException(
+                                       jobID, "Job was cancelled.", 
unpackedError);
+                               // TODO should we also notify client?
+                               jobCompletionActions.jobFailed(exception);
+                       }
+                       else if (newJobStatus == JobStatus.FAILED) {
+                               final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
+                               final JobExecutionException exception = new 
JobExecutionException(
+                                       jobID, "Job execution failed.", 
unpackedError);
+                               // TODO should we also notify client?
+                               jobCompletionActions.jobFailed(exception);
+                       }
+                       else {
+                               final JobExecutionException exception = new 
JobExecutionException(
+                                       jobID, newJobStatus + " is not a 
terminal state.");
+                               // TODO should we also notify client?
+                               jobCompletionActions.jobFailed(exception);
+                               throw new RuntimeException(exception);
+                       }
+               }
+       }
+
        private void notifyOfNewResourceManagerLeader(
                final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
        {

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 6587ccb..686a3f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -19,7 +19,15 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.io.network.PartitionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.message.NextInputSplit;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 
@@ -47,7 +55,47 @@ public interface JobMasterGateway extends RpcGateway {
         * Updates the task execution state for a given task.
         *
         * @param taskExecutionState New task execution state for a given task
-        * @return Future acknowledge of the task execution state update
+        * @return Future flag of the task execution state update result
         */
-       Future<Acknowledge> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+       Future<Boolean> updateTaskExecutionState(TaskExecutionState 
taskExecutionState);
+
+       /**
+        * Requesting next input split for the {@link ExecutionJobVertex}. The 
next input split is sent back to the sender
+        * as a {@link NextInputSplit} message.
+        *
+        * @param vertexID         The job vertex id
+        * @param executionAttempt The execution attempt id
+        * @return The future of the input split. If there is no further input 
split, will return an empty object.
+        * @throws Exception if some error occurred or information mismatch.
+        */
+       Future<NextInputSplit> requestNextInputSplit(
+               final JobVertexID vertexID,
+               final ExecutionAttemptID executionAttempt) throws Exception;
+
+       /**
+        * Requests the current state of the partition.
+        * The state of a partition is currently bound to the state of the 
producing execution.
+        *
+        * @param partitionId     The partition ID of the partition to request 
the state of.
+        * @param taskExecutionId The execution attempt ID of the task 
requesting the partition state.
+        * @param taskResultId    The input gate ID of the task requesting the 
partition state.
+        * @return The future of the partition state
+        */
+       Future<PartitionState> requestPartitionState(
+               final ResultPartitionID partitionId,
+               final ExecutionAttemptID taskExecutionId,
+               final IntermediateDataSetID taskResultId);
+
+       /**
+        * Notifies the JobManager about available data for a produced 
partition.
+        * <p>
+        * There is a call to this method for each {@link ExecutionVertex} 
instance once per produced
+        * {@link ResultPartition} instance, either when first producing data 
(for pipelined executions)
+        * or when all data has been produced (for staged executions).
+        * <p>
+        * The JobManager then can decide when to schedule the partition 
consumers of the given session.
+        *
+        * @param partitionID The partition which has already produced data
+        */
+       void scheduleOrUpdateConsumers(final ResultPartitionID partitionID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
new file mode 100644
index 0000000..fe511ed
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/message/NextInputSplit.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.message;
+
+import java.io.Serializable;
+
+/**
+ * Contains the next input split for a task.
+ */
+public class NextInputSplit implements Serializable {
+
+       private static final long serialVersionUID = -1355784074565856240L;
+
+       private final byte[] splitData;
+
+       public NextInputSplit(final byte[] splitData) {
+               this.splitData = splitData;
+       }
+
+       public byte[] getSplitData() {
+               return splitData;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f695de4..f45afa3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -129,7 +129,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                try {
                        leaderElectionService.stop();
                        for (JobID jobID : jobMasterGateways.keySet()) {
-                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+                               
highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
                        }
                        super.shutDown();
                } catch (Throwable e) {
@@ -179,7 +179,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                        final LeaderConnectionInfo 
jobMasterLeaderInfo;
                                        try {
                                                jobMasterLeaderInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-                                                       
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
+                                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
                                        } catch (Exception e) {
                                                log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
                                                throw new Exception("Failed to 
retrieve JobMasterLeaderRetriever");
@@ -203,7 +203,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                                if 
(!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
                                                        JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
                                                        try {
-                                                               
LeaderRetrievalService jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+                                                               
LeaderRetrievalService jobMasterLeaderRetriever = 
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
                                                                
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
                                                        } catch (Exception e) {
                                                                
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 67fc397..c5d44b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -132,13 +132,46 @@ public class ZooKeeperUtils {
         * @throws Exception
         */
        public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
-                       Configuration configuration) throws Exception {
-               CuratorFramework client = startCuratorFramework(configuration);
+               final Configuration configuration) throws Exception
+       {
+               final CuratorFramework client = 
startCuratorFramework(configuration);
+               return createLeaderRetrievalService(client, configuration);
+       }
+
+       /**
+        * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+        *
+        * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
+        * @param configuration {@link Configuration} object containing the 
configuration values
+        * @return {@link ZooKeeperLeaderRetrievalService} instance.
+        * @throws Exception
+        */
+       public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
+               final CuratorFramework client,
+               final Configuration configuration) throws Exception
+       {
+               return createLeaderRetrievalService(client, configuration, "");
+       }
+
+       /**
+        * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+        *
+        * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
+        * @param configuration {@link Configuration} object containing the 
configuration values
+        * @param pathSuffix    The path suffix which we want to append
+        * @return {@link ZooKeeperLeaderRetrievalService} instance.
+        * @throws Exception
+        */
+       public static ZooKeeperLeaderRetrievalService 
createLeaderRetrievalService(
+               final CuratorFramework client,
+               final Configuration configuration,
+               final String pathSuffix) throws Exception
+       {
                String leaderPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-                               ConfigConstants.ZOOKEEPER_LEADER_PATH);
+                       configuration,
+                       ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+                       ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
                return new ZooKeeperLeaderRetrievalService(client, leaderPath);
        }
@@ -171,16 +204,33 @@ public class ZooKeeperUtils {
                        CuratorFramework client,
                        Configuration configuration) throws Exception {
 
-               String latchPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
-                               ConfigConstants.ZOOKEEPER_LATCH_PATH);
-               String leaderPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
-                               configuration,
-                               ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
-                               ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
-                               ConfigConstants.ZOOKEEPER_LEADER_PATH);
+               return createLeaderElectionService(client, configuration, "");
+       }
+
+       /**
+        * Creates a {@link ZooKeeperLeaderElectionService} instance.
+        *
+        * @param client        The {@link CuratorFramework} ZooKeeper client 
to use
+        * @param configuration {@link Configuration} object containing the 
configuration values
+        * @param pathSuffix    The path suffix which we want to append
+        * @return {@link ZooKeeperLeaderElectionService} instance.
+        * @throws Exception
+        */
+       public static ZooKeeperLeaderElectionService 
createLeaderElectionService(
+               final CuratorFramework client,
+               final Configuration configuration,
+               final String pathSuffix) throws Exception
+       {
+               final String latchPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
+                       configuration,
+                       ConfigConstants.HA_ZOOKEEPER_LATCH_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_LATCH_PATH,
+                       ConfigConstants.ZOOKEEPER_LATCH_PATH) + pathSuffix;
+               final String leaderPath = 
ConfigurationUtil.getStringWithDeprecatedKeys(
+                       configuration,
+                       ConfigConstants.HA_ZOOKEEPER_LEADER_PATH,
+                       ConfigConstants.DEFAULT_ZOOKEEPER_LEADER_PATH,
+                       ConfigConstants.ZOOKEEPER_LEADER_PATH) + pathSuffix;
 
                return new ZooKeeperLeaderElectionService(client, latchPath, 
leaderPath);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index d16c1b0..7a764ca 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -52,6 +52,8 @@ import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{AkkaActorGateway, 
HardwareDescription, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -944,12 +946,12 @@ class TaskManager(
 
     val partitionStateChecker = new ActorGatewayPartitionStateChecker(
       jobManagerGateway,
-      config.timeout)
+      new FiniteDuration(config.getTimeout().toMilliseconds, 
TimeUnit.MILLISECONDS))
 
     val resultPartitionConsumableNotifier = new 
ActorGatewayResultPartitionConsumableNotifier(
       context.dispatcher,
       jobManagerGateway,
-      config.timeout)
+      new FiniteDuration(config.getTimeout().toMilliseconds, 
TimeUnit.MILLISECONDS))
 
     connectionUtils = Some(
       (checkpointResponder,

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 1a5450d..faf69cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -36,7 +36,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
        private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
 
-       private volatile LeaderElectionService jobMasterLeaderElectionService;
+       private ConcurrentHashMap<JobID, LeaderElectionService> 
jobManagerLeaderElectionServices = new ConcurrentHashMap<>();
 
        private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
 
@@ -56,8 +56,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                this.jobMasterLeaderRetrievers.put(jobID, 
jobMasterLeaderRetriever);
        }
 
-       public void setJobMasterLeaderElectionService(LeaderElectionService 
leaderElectionService) {
-               this.jobMasterLeaderElectionService = leaderElectionService;
+       public void setJobMasterLeaderElectionService(JobID jobID, 
LeaderElectionService leaderElectionService) {
+               this.jobManagerLeaderElectionServices.put(jobID, 
leaderElectionService);
        }
 
        public void 
setResourceManagerLeaderElectionService(LeaderElectionService 
leaderElectionService) {
@@ -87,7 +87,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
                LeaderRetrievalService service = 
this.jobMasterLeaderRetrievers.get(jobID);
                if (service != null) {
                        return service;
@@ -97,24 +97,24 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderElectionService getJobMasterLeaderElectionService(JobID 
jobID) throws Exception {
-               LeaderElectionService service = jobMasterLeaderElectionService;
+       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+               LeaderElectionService service = 
resourceManagerLeaderElectionService;
 
                if (service != null) {
                        return service;
                } else {
-                       throw new 
IllegalStateException("JobMasterLeaderElectionService has not been set");
+                       throw new 
IllegalStateException("ResourceManagerLeaderElectionService has not been set");
                }
        }
 
        @Override
-       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
-               LeaderElectionService service = 
resourceManagerLeaderElectionService;
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
+               LeaderElectionService service = 
this.jobManagerLeaderElectionServices.get(jobID);
 
                if (service != null) {
                        return service;
                } else {
-                       throw new 
IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+                       throw new 
IllegalStateException("JobMasterLeaderElectionService has not been set");
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 5ec6991..8419abe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -406,7 +406,8 @@ public class JobManagerHARecoveryTest {
                        storedJobs.remove(jobId);
                }
 
-               boolean contains(JobID jobId) {
+               @Override
+               public boolean contains(JobID jobId) {
                        return storedJobs.containsKey(jobId);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index bfe5f55..3a769bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.junit.After;
@@ -57,6 +58,8 @@ public class JobManagerRunnerMockTest {
 
        private LeaderElectionService leaderElectionService;
 
+       private SubmittedJobGraphStore submittedJobGraphStore;
+
        private TestingOnCompletionActions jobCompletion;
 
        @Before
@@ -72,8 +75,12 @@ public class JobManagerRunnerMockTest {
                leaderElectionService = mock(LeaderElectionService.class);
                when(leaderElectionService.hasLeadership()).thenReturn(true);
 
+               submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
+               
when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(true);
+
                HighAvailabilityServices haServices = 
mock(HighAvailabilityServices.class);
-               
when(haServices.getJobMasterLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+               
when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
+               
when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore);
 
                runner = PowerMockito.spy(new JobManagerRunner(
                        new JobGraph("test"),
@@ -127,7 +134,7 @@ public class JobManagerRunnerMockTest {
        public void testJobFinishedByOtherBeforeGrantLeadership() throws 
Exception {
                runner.start();
 
-               when(runner.isJobFinishedByOthers()).thenReturn(true);
+               
when(submittedJobGraphStore.contains(any(JobID.class))).thenReturn(false);
                runner.grantLeadership(UUID.randomUUID());
 
                // runner should shutdown automatic and informed the job 
completion

http://git-wip-us.apache.org/repos/asf/flink/blob/214113eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index e3018c9..805ea71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -237,7 +237,7 @@ public class SlotProtocolTest extends TestLogger {
                
testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
 
                final TestingLeaderElectionService jmLeaderElectionService = 
new TestingLeaderElectionService();
-               
testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService);
+               testingHA.setJobMasterLeaderElectionService(jobID, 
jmLeaderElectionService);
                final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jmAddress, jmID);
                testingHA.setJobMasterLeaderRetriever(jobID, 
jmLeaderRetrievalService);
 

Reply via email to