Repository: flink
Updated Branches:
  refs/heads/master bb972b85a -> 81114d5f7


[FLINK-6120] [heartbeat] Implement heartbeat logic between JobManager and 
ResourceManager

This closes #3645.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81114d5f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81114d5f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81114d5f

Branch: refs/heads/master
Commit: 81114d5f71ee05908a93ebd21475dcec1f5eed09
Parents: bb972b8
Author: Zhijiang <wangzhijiang...@aliyun.com>
Authored: Thu Mar 30 00:30:29 2017 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Apr 27 17:56:51 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      | 153 +++++++++++++++----
 .../runtime/jobmaster/JobMasterGateway.java     |   7 +
 .../jobmaster/JobMasterRegistrationSuccess.java |  14 +-
 .../resourcemanager/ResourceManager.java        | 133 +++++++++++++---
 .../resourcemanager/ResourceManagerGateway.java |  18 ++-
 .../registration/JobManagerRegistration.java    |   9 ++
 .../clusterframework/ResourceManagerTest.java   | 116 +++++++++++++-
 .../flink/runtime/jobmaster/JobMasterTest.java  |  89 ++++++++++-
 .../ResourceManagerJobMasterTest.java           |  42 ++++-
 .../slotmanager/SlotProtocolTest.java           |   6 +-
 .../taskexecutor/TaskExecutorITCase.java        |   8 +-
 11 files changed, 521 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6fe8cb3..ab43577 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -155,7 +155,10 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        private final MetricGroup jobMetricGroup;
 
        /** The heartbeat manager with task managers */
-       private final HeartbeatManager<Void, Void> heartbeatManager;
+       private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
+
+       /** The heartbeat manager with resource manager */
+       private final HeartbeatManager<Void, Void> 
resourceManagerHeartbeatManager;
 
        /** The execution context which is used to execute futures */
        private final Executor executor;
@@ -218,12 +221,18 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                this.errorHandler = checkNotNull(errorHandler);
                this.userCodeLoader = checkNotNull(userCodeLoader);
 
-               this.heartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
+               this.taskManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
                        resourceId,
                        new TaskManagerHeartbeatListener(),
                        rpcService.getScheduledExecutor(),
                        log);
 
+               this.resourceManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManager(
+                               resourceId,
+                               new ResourceManagerHeartbeatListener(),
+                               rpcService.getScheduledExecutor(),
+                               log);
+
                final String jobName = jobGraph.getName();
                final JobID jid = jobGraph.getJobID();
 
@@ -309,7 +318,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         */
        @Override
        public void shutDown() throws Exception {
-               heartbeatManager.stop();
+               taskManagerHeartbeatManager.stop();
+               resourceManagerHeartbeatManager.stop();
 
                // make sure there is a graceful exit
                getSelf().suspendExecution(new Exception("JobManager is 
shutting down."));
@@ -407,7 +417,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                slotPoolGateway.suspend();
 
                // disconnect from resource manager:
-               closeResourceManagerConnection();
+               closeResourceManagerConnection(new Exception("Execution was 
suspended.", cause));
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -534,7 +544,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public void disconnectTaskManager(final ResourceID resourceID, final 
Exception cause) {
-               heartbeatManager.unmonitorTarget(resourceID);
+               taskManagerHeartbeatManager.unmonitorTarget(resourceID);
                slotPoolGateway.releaseTaskManager(resourceID);
 
                Tuple2<TaskManagerLocation, TaskExecutorGateway> 
taskManagerConnection = registeredTaskManagers.remove(resourceID);
@@ -766,7 +776,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, 
taskExecutorGateway));
 
                                        // monitor the task manager as 
heartbeat target
-                                       
heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() {
+                                       
taskManagerHeartbeatManager.monitorTarget(taskManagerId, new 
HeartbeatTarget<Void>() {
                                                @Override
                                                public void 
receiveHeartbeat(ResourceID resourceID, Void payload) {
                                                        // the task manager 
will not request heartbeat, so this method will never be called currently
@@ -788,13 +798,24 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        public void disconnectResourceManager(
                        final UUID jobManagerLeaderId,
                        final UUID resourceManagerLeaderId,
-                       final Exception cause) {
-               // TODO: Implement disconnect behaviour
+                       final Exception cause) throws Exception {
+
+               validateLeaderSessionId(jobManagerLeaderId);
+
+               if (resourceManagerConnection != null
+                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
+                       closeResourceManagerConnection(cause);
+               }
        }
 
        @RpcMethod
        public void heartbeatFromTaskManager(final ResourceID resourceID) {
-               heartbeatManager.receiveHeartbeat(resourceID, null);
+               taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+       }
+
+       @RpcMethod
+       public void heartbeatFromResourceManager(final ResourceID resourceID) {
+               resourceManagerHeartbeatManager.requestHeartbeat(resourceID, 
null);
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -872,56 +893,79 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
-       private void notifyOfNewResourceManagerLeader(
-                       final String resourceManagerAddress, final UUID 
resourceManagerLeaderId)
-       {
-               validateRunsInMainThread();
-
+       private void notifyOfNewResourceManagerLeader(final String 
resourceManagerAddress, final UUID resourceManagerLeaderId) {
                if (resourceManagerConnection != null) {
                        if (resourceManagerAddress != null) {
                                if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-                                               && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
+                                       && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
                                        // both address and leader id are not 
changed, we can keep the old connection
                                        return;
                                }
+
+                               closeResourceManagerConnection(new Exception(
+                                       "ResourceManager leader changed to new 
address " + resourceManagerAddress));
+
                                log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-                                               
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
+                                       
resourceManagerConnection.getTargetAddress(), resourceManagerAddress);
                        } else {
                                log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
-                                               
resourceManagerConnection.getTargetAddress());
+                                       
resourceManagerConnection.getTargetAddress());
                        }
                }
 
-               closeResourceManagerConnection();
-
                if (resourceManagerAddress != null) {
                        log.info("Attempting to register at ResourceManager 
{}", resourceManagerAddress);
+
                        resourceManagerConnection = new 
ResourceManagerConnection(
-                                       log, jobGraph.getJobID(), getAddress(), 
leaderSessionID,
-                                       resourceManagerAddress, 
resourceManagerLeaderId, executor);
+                               log,
+                               jobGraph.getJobID(),
+                               resourceId,
+                               getAddress(),
+                               leaderSessionID,
+                               resourceManagerAddress,
+                               resourceManagerLeaderId,
+                               executor);
+
                        resourceManagerConnection.start();
                }
        }
 
-       private void onResourceManagerRegistrationSuccess(final 
JobMasterRegistrationSuccess success) {
-               validateRunsInMainThread();
+       private void establishResourceManagerConnection(final 
JobMasterRegistrationSuccess success) {
+               final UUID resourceManagerLeaderId = 
success.getResourceManagerLeaderId();
        
                // verify the response with current connection
                if (resourceManagerConnection != null
-                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
-               {
-                       log.info("JobManager successfully registered at 
ResourceManager, leader id: {}.",
-                                       success.getResourceManagerLeaderId());
+                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
 
-                       slotPoolGateway.connectToResourceManager(
-                                       success.getResourceManagerLeaderId(), 
resourceManagerConnection.getTargetGateway());
+                       log.info("JobManager successfully registered at 
ResourceManager, leader id: {}.", resourceManagerLeaderId);
+
+                       final ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+
+                       
slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, 
resourceManagerGateway);
+
+                       
resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(),
 new HeartbeatTarget<Void>() {
+                               @Override
+                               public void receiveHeartbeat(ResourceID 
resourceID, Void payload) {
+                                       
resourceManagerGateway.heartbeatFromJobManager(resourceID);
+                               }
+
+                               @Override
+                               public void requestHeartbeat(ResourceID 
resourceID, Void payload) {
+                                       // request heartbeat will never be 
called on the job manager side
+                               }
+                       });
                }
        }
 
-       private void closeResourceManagerConnection() {
-               validateRunsInMainThread();
-
+       private void closeResourceManagerConnection(Exception cause) {
                if (resourceManagerConnection != null) {
+                       log.info("Close ResourceManager connection {}.", 
resourceManagerConnection.getResourceManagerResourceID(), cause);
+
+                       
resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID());
+
+                       ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+                       
resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(),
 cause);
+
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
                }
@@ -964,13 +1008,18 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        {
                private final JobID jobID;
 
+               private final ResourceID jobManagerResourceID;
+
                private final String jobManagerRpcAddress;
 
                private final UUID jobManagerLeaderID;
 
+               private ResourceID resourceManagerResourceID;
+
                ResourceManagerConnection(
                                final Logger log,
                                final JobID jobID,
+                               final ResourceID jobManagerResourceID,
                                final String jobManagerRpcAddress,
                                final UUID jobManagerLeaderID,
                                final String resourceManagerAddress,
@@ -979,6 +1028,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                {
                        super(log, resourceManagerAddress, 
resourceManagerLeaderID, executor);
                        this.jobID = checkNotNull(jobID);
+                       this.jobManagerResourceID = 
checkNotNull(jobManagerResourceID);
                        this.jobManagerRpcAddress = 
checkNotNull(jobManagerRpcAddress);
                        this.jobManagerLeaderID = 
checkNotNull(jobManagerLeaderID);
                }
@@ -998,6 +1048,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                        return gateway.registerJobManager(
                                                leaderId,
                                                jobManagerLeaderID,
+                                               jobManagerResourceID,
                                                jobManagerRpcAddress,
                                                jobID,
                                                timeout);
@@ -1010,7 +1061,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       
onResourceManagerRegistrationSuccess(success);
+                                       resourceManagerResourceID = 
success.getResourceManagerResourceId();
+                                       
establishResourceManagerConnection(success);
                                }
                        });
                }
@@ -1019,6 +1071,14 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                protected void onRegistrationFailure(final Throwable failure) {
                        handleFatalError(failure);
                }
+
+               public ResourceID getResourceManagerResourceID() {
+                       return resourceManagerResourceID;
+               }
+
+               public JobID getJobID() {
+                       return jobID;
+               }
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -1063,4 +1123,31 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        return FlinkCompletableFuture.completed(null);
                }
        }
+
+       private class ResourceManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
+
+               @Override
+               public void notifyHeartbeatTimeout(final ResourceID resourceId) 
{
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("The heartbeat of 
ResourceManager with id {} timed out.", resourceId);
+
+                                       closeResourceManagerConnection(
+                                               new TimeoutException(
+                                                       "The heartbeat of 
ResourceManager with id " + resourceId + " timed out."));
+                               }
+                       });
+               }
+
+               @Override
+               public void reportPayload(ResourceID resourceID, Void payload) {
+                       // nothing to do since the payload is of type Void
+               }
+
+               @Override
+               public Future<Void> retrievePayload() {
+                       return FlinkCompletableFuture.completed(null);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 13a7372..5a271f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -226,4 +226,11 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * @param resourceID unique id of the task manager
         */
        void heartbeatFromTaskManager(final ResourceID resourceID);
+
+       /**
+        * Heartbeat request from the resource manager
+        *
+        * @param resourceID unique id of the resource manager
+        */
+       void heartbeatFromResourceManager(final ResourceID resourceID);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index 4058452..a7a6224 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
 import java.util.UUID;
@@ -35,9 +36,15 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
 
        private final UUID resourceManagerLeaderId;
 
-       public JobMasterRegistrationSuccess(final long heartbeatInterval, final 
UUID resourceManagerLeaderId) {
+       private final ResourceID resourceManagerResourceId;
+
+       public JobMasterRegistrationSuccess(
+                       final long heartbeatInterval,
+                       final UUID resourceManagerLeaderId,
+                       final ResourceID resourceManagerResourceId) {
                this.heartbeatInterval = heartbeatInterval;
                this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+               this.resourceManagerResourceId = 
checkNotNull(resourceManagerResourceId);
        }
 
        /**
@@ -53,11 +60,16 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
                return resourceManagerLeaderId;
        }
 
+       public ResourceID getResourceManagerResourceId() {
+               return resourceManagerResourceId;
+       }
+
        @Override
        public String toString() {
                return "JobMasterRegistrationSuccess{" +
                        "heartbeatInterval=" + heartbeatInterval +
                        ", resourceManagerLeaderId=" + resourceManagerLeaderId +
+                       ", resourceManagerResourceId=" + 
resourceManagerResourceId +
                        '}';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index c0ff412..f17dbe5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -79,7 +79,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact 
with him remotely:
  * <ul>
- *     <li>{@link #registerJobManager(UUID, UUID, String, JobID)} registers a 
{@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobManager(UUID, UUID, String, JobID, ResourceID)} 
registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from 
the resource manager</li>
  * </ul>
  */
@@ -96,6 +96,9 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /** All currently registered JobMasterGateways scoped by JobID. */
        private final Map<JobID, JobManagerRegistration> 
jobManagerRegistrations;
 
+       /** All currently registered JobMasterGateways scoped by ResourceID. */
+       private final Map<ResourceID, JobManagerRegistration> 
jmResourceIdRegistrations;
+
        /** Service to retrieve the job leader ids */
        private final JobLeaderIdService jobLeaderIdService;
 
@@ -108,6 +111,9 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /** The heartbeat manager with task managers. */
        private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager;
 
+       /** The heartbeat manager with job managers. */
+       private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
+
        /** The factory to construct the SlotManager. */
        private final SlotManagerFactory slotManagerFactory;
 
@@ -152,12 +158,19 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
                this.taskManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
-                               resourceId,
-                               new TaskManagerHeartbeatListener(),
-                               rpcService.getScheduledExecutor(),
-                               log);
+                       resourceId,
+                       new TaskManagerHeartbeatListener(),
+                       rpcService.getScheduledExecutor(),
+                       log);
+
+               this.jobManagerHeartbeatManager = 
heartbeatServices.createHeartbeatManagerSender(
+                       resourceId,
+                       new JobManagerHeartbeatListener(),
+                       rpcService.getScheduledExecutor(),
+                       log);
 
                this.jobManagerRegistrations = new HashMap<>(4);
+               this.jmResourceIdRegistrations = new HashMap<>(4);
                this.taskExecutors = new HashMap<>(8);
                this.leaderSessionId = null;
                infoMessageListeners = new ConcurrentHashMap<>(8);
@@ -204,6 +217,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                taskManagerHeartbeatManager.stop();
 
+               jobManagerHeartbeatManager.stop();
+
                try {
                        super.shutDown();
                } catch (Exception e) {
@@ -231,11 +246,13 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        public Future<RegistrationResponse> registerJobManager(
                        final UUID resourceManagerLeaderId,
                        final UUID jobManagerLeaderId,
+                       final ResourceID jobManagerResourceId,
                        final String jobManagerAddress,
                        final JobID jobId) {
 
                checkNotNull(resourceManagerLeaderId);
                checkNotNull(jobManagerLeaderId);
+               checkNotNull(jobManagerResourceId);
                checkNotNull(jobManagerAddress);
                checkNotNull(jobId);
 
@@ -276,7 +293,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                        Future<RegistrationResponse> registrationResponseFuture 
= jobMasterGatewayFuture.thenCombineAsync(jobLeaderIdFuture, new 
BiFunction<JobMasterGateway, UUID, RegistrationResponse>() {
                                @Override
-                               public RegistrationResponse 
apply(JobMasterGateway jobMasterGateway, UUID jobLeaderId) {
+                               public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway, UUID jobLeaderId) {
                                        if (isValid(resourceManagerLeaderId)) {
                                                if 
(jobLeaderId.equals(jobManagerLeaderId)) {
                                                        if 
(jobManagerRegistrations.containsKey(jobId)) {
@@ -291,21 +308,43 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                                                                
oldJobManagerRegistration.getJobID(),
                                                                                
new Exception("New job leader for job " + jobId + " found."));
 
-                                                                       
JobManagerRegistration jobManagerRegistration = new 
JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway);
+                                                                       
JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
+                                                                               
jobId,
+                                                                               
jobManagerResourceId,
+                                                                               
jobLeaderId,
+                                                                               
jobMasterGateway);
                                                                        
jobManagerRegistrations.put(jobId, jobManagerRegistration);
+                                                                       
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
                                                                }
                                                        } else {
                                                                // new 
registration for the job
-                                                               
JobManagerRegistration jobManagerRegistration = new 
JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway);
-
+                                                               
JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
+                                                                       jobId,
+                                                                       
jobManagerResourceId,
+                                                                       
jobLeaderId,
+                                                                       
jobMasterGateway);
                                                                
jobManagerRegistrations.put(jobId, jobManagerRegistration);
+                                                               
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
                                                        }
 
                                                        log.info("Registered 
job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
 
+                                                       
jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new 
HeartbeatTarget<Void>() {
+                                                               @Override
+                                                               public void 
receiveHeartbeat(ResourceID resourceID, Void payload) {
+                                                                       // the 
ResourceManager will always send heartbeat requests to the JobManager
+                                                               }
+
+                                                               @Override
+                                                               public void 
requestHeartbeat(ResourceID resourceID, Void payload) {
+                                                                       
jobMasterGateway.heartbeatFromResourceManager(resourceID);
+                                                               }
+                                                       });
+
                                                        return new 
JobMasterRegistrationSuccess(
                                                                
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
-                                                               
getLeaderSessionId());
+                                                               
getLeaderSessionId(),
+                                                               resourceId);
 
                                                } else {
                                                        log.debug("The job 
manager leader id {} did not match the job " +
@@ -423,10 +462,20 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        }
 
        @RpcMethod
+       public void heartbeatFromJobManager(final ResourceID resourceID) {
+               jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
+       }
+
+       @RpcMethod
        public void disconnectTaskManager(final ResourceID resourceId, final 
Exception cause) {
                closeTaskManagerConnection(resourceId, cause);
        }
 
+       @RpcMethod
+       public void disconnectJobManager(final JobID jobId, final Exception 
cause) {
+               closeJobManagerConnection(jobId, cause);
+       }
+
        /**
         * Requests a slot from the resource manager.
         *
@@ -577,6 +626,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
        private void clearState() {
                jobManagerRegistrations.clear();
+               jmResourceIdRegistrations.clear();
                taskExecutors.clear();
                slotManager.clearState();
 
@@ -590,23 +640,31 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        }
 
        /**
-        * Disconnects the job manager which is connected for the given job 
from the resource manager.
+        * This method should be called by the framework once it detects that a 
currently registered
+        * job manager has failed.
         *
-        * @param jobId identifying the job whose leader shall be disconnected
+        * @param jobId identifying the job whose leader shall be disconnected.
+        * @param cause The exception which cause the JobManager failed.
         */
-       protected void disconnectJobManager(JobID jobId, Exception cause) {
+       protected void closeJobManagerConnection(JobID jobId, Exception cause) {
                JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.remove(jobId);
 
                if (jobManagerRegistration != null) {
+                       final ResourceID jobManagerResourceId = 
jobManagerRegistration.getJobManagerResourceID();
+                       final JobMasterGateway jobMasterGateway = 
jobManagerRegistration.getJobManagerGateway();
+                       final UUID jobManagerLeaderId = 
jobManagerRegistration.getLeaderID();
+
                        log.info("Disconnect job manager {}@{} for job {} from 
the resource manager.",
-                               jobManagerRegistration.getLeaderID(),
-                               
jobManagerRegistration.getJobManagerGateway().getAddress(),
+                               jobManagerLeaderId,
+                               jobMasterGateway.getAddress(),
                                jobId);
 
-                       JobMasterGateway jobMasterGateway = 
jobManagerRegistration.getJobManagerGateway();
+                       
jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceId);
+
+                       jmResourceIdRegistrations.remove(jobManagerResourceId);
 
                        // tell the job manager about the disconnect
-                       
jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(),
 getLeaderSessionId(), cause);
+                       
jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, 
getLeaderSessionId(), cause);
                } else {
                        log.debug("There was no registered job manager for job 
{}.", jobId);
                }
@@ -882,14 +940,47 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                @Override
                public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
-                       log.info("The heartbeat of TaskManager with id {} timed 
out.", resourceID);
-
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
+                                       log.info("The heartbeat of TaskManager 
with id {} timed out.", resourceID);
+
                                        closeTaskManagerConnection(
-                                               resourceID,
-                                               new TimeoutException("Task 
manager with id " + resourceID + " heartbeat timed out."));
+                                                       resourceID,
+                                                       new 
TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  
timed out."));
+                               }
+                       });
+               }
+
+               @Override
+               public void reportPayload(ResourceID resourceID, Void payload) {
+                       // nothing to do since there is no payload
+               }
+
+               @Override
+               public Future<Void> retrievePayload() {
+                       return FlinkCompletableFuture.completed(null);
+               }
+       }
+
+       private class JobManagerHeartbeatListener implements 
HeartbeatListener<Void, Void> {
+
+               @Override
+               public void notifyHeartbeatTimeout(final ResourceID resourceID) 
{
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("The heartbeat of JobManager 
with id {} timed out.", resourceID);
+
+                                       if 
(jmResourceIdRegistrations.containsKey(resourceID)) {
+                                               JobManagerRegistration 
jobManagerRegistration = jmResourceIdRegistrations.get(resourceID);
+
+                                               if (jobManagerRegistration != 
null) {
+                                                       
closeJobManagerConnection(
+                                                               
jobManagerRegistration.getJobID(),
+                                                               new 
TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed 
out."));
+                                               }
+                                       }
                                }
                        });
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index cda4a7c..530113f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -44,6 +44,7 @@ public interface ResourceManagerGateway extends RpcGateway {
         *
         * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
         * @param jobMasterLeaderId The fencing token for the JobMaster leader
+        * @param jobMasterResourceId   The resource ID of the JobMaster that 
registers
         * @param jobMasterAddress        The address of the JobMaster that 
registers
         * @param jobID                   The Job ID of the JobMaster that 
registers
         * @param timeout                 Timeout for the future to complete
@@ -52,11 +53,11 @@ public interface ResourceManagerGateway extends RpcGateway {
        Future<RegistrationResponse> registerJobManager(
                UUID resourceManagerLeaderId,
                UUID jobMasterLeaderId,
+               ResourceID jobMasterResourceId,
                String jobMasterAddress,
                JobID jobID,
                @RpcTimeout Time timeout);
 
-
        /**
         * Requests a slot from the resource manager.
         *
@@ -139,10 +140,25 @@ public interface ResourceManagerGateway extends 
RpcGateway {
        void heartbeatFromTaskManager(final ResourceID heartbeatOrigin);
 
        /**
+        * Sends the heartbeat to resource manager from job manager
+        *
+        * @param heartbeatOrigin unique id of the job manager
+        */
+       void heartbeatFromJobManager(final ResourceID heartbeatOrigin);
+
+       /**
         * Disconnects a TaskManager specified by the given resourceID from the 
{@link ResourceManager}.
         *
         * @param resourceID identifying the TaskManager to disconnect
         * @param cause for the disconnection of the TaskManager
         */
        void disconnectTaskManager(ResourceID resourceID, Exception cause);
+
+       /**
+        * Disconnects a JobManager specified by the given resourceID from the 
{@link ResourceManager}.
+        *
+        * @param jobId JobID for which the JobManager was the leader
+        * @param cause for the disconnection of the JobManager
+        */
+       void disconnectJobManager(JobID jobId, Exception cause);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
index a1deb65..df3a39f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager.registration;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.util.Preconditions;
 
@@ -30,15 +31,19 @@ import java.util.UUID;
 public class JobManagerRegistration {
        private final JobID jobID;
 
+       private final ResourceID jobManagerResourceID;
+
        private final UUID leaderID;
 
        private final JobMasterGateway jobManagerGateway;
 
        public JobManagerRegistration(
                        JobID jobID,
+                       ResourceID jobManagerResourceID,
                        UUID leaderID,
                        JobMasterGateway jobManagerGateway) {
                this.jobID = Preconditions.checkNotNull(jobID);
+               this.jobManagerResourceID = 
Preconditions.checkNotNull(jobManagerResourceID);
                this.leaderID = Preconditions.checkNotNull(leaderID);
                this.jobManagerGateway = 
Preconditions.checkNotNull(jobManagerGateway);
        }
@@ -47,6 +52,10 @@ public class JobManagerRegistration {
                return jobID;
        }
 
+       public ResourceID getJobManagerResourceID() {
+               return jobManagerResourceID;
+       }
+
        public UUID getLeaderID() {
                return leaderID;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 3464129..e4e20b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -37,7 +38,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -412,27 +416,32 @@ public class ResourceManagerTest extends TestLogger {
 
                        final SlotReport slotReport = new SlotReport();
                        // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
-                       Future<RegistrationResponse> successfulFuture =
-                                       
resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, 
taskManagerResourceID, slotReport);
+                       Future<RegistrationResponse> successfulFuture = 
resourceManager.registerTaskExecutor(
+                               rmLeaderSessionId,
+                               taskManagerAddress,
+                               taskManagerResourceID,
+                               slotReport);
                        RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                        assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
 
                        ArgumentCaptor<Runnable> heartbeatRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
-                       verify(scheduledExecutor, times(1)).scheduleAtFixedRate(
+                       verify(scheduledExecutor, times(2)).scheduleAtFixedRate(
                                heartbeatRunnableCaptor.capture(),
                                eq(0L),
                                eq(heartbeatInterval),
                                eq(TimeUnit.MILLISECONDS));
 
-                       Runnable heartbeatRunnable = 
heartbeatRunnableCaptor.getValue();
+                       List<Runnable> heartbeatRunnable = 
heartbeatRunnableCaptor.getAllValues();
 
                        ArgumentCaptor<Runnable> timeoutRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
                        
verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), 
eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
 
                        Runnable timeoutRunnable = 
timeoutRunnableCaptor.getValue();
 
-                       // run the first heartbeat request
-                       heartbeatRunnable.run();
+                       // run all the heartbeat requests
+                       for (Runnable runnable : heartbeatRunnable) {
+                               runnable.run();
+                       }
 
                        verify(taskExecutorGateway, 
times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID));
 
@@ -445,4 +454,99 @@ public class ResourceManagerTest extends TestLogger {
                        rpcService.stopService();
                }
        }
+
+       @Test
+       public void testHeartbeatTimeoutWithJobManager() throws Exception {
+               final String jobMasterAddress = "jm";
+               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
+               final ResourceID rmResourceId = ResourceID.generate();
+               final UUID rmLeaderId = UUID.randomUUID();
+               final UUID jmLeaderId = UUID.randomUUID();
+               final JobID jobId = new JobID();
+
+               final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+
+               final TestingSerialRpcService rpcService = new 
TestingSerialRpcService();
+               rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+
+               final ResourceManagerConfiguration resourceManagerConfiguration 
= new ResourceManagerConfiguration(
+                       Time.seconds(5L),
+                       Time.seconds(5L));
+
+               final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
+               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderId);
+               final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);
+
+               final long heartbeatInterval = 1L;
+               final long heartbeatTimeout = 5L;
+               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
+
+               final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               final MetricRegistry metricRegistry = 
mock(MetricRegistry.class);
+               final JobLeaderIdService jobLeaderIdService = new 
JobLeaderIdService(
+                       highAvailabilityServices,
+                       rpcService.getScheduledExecutor(),
+                       Time.minutes(5L));
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               try {
+                       final StandaloneResourceManager resourceManager = new 
StandaloneResourceManager(
+                               rpcService,
+                               FlinkResourceManager.RESOURCE_MANAGER_NAME,
+                               rmResourceId,
+                               resourceManagerConfiguration,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               slotManagerFactory,
+                               metricRegistry,
+                               jobLeaderIdService,
+                               testingFatalErrorHandler);
+
+                       resourceManager.start();
+
+                       rmLeaderElectionService.isLeader(rmLeaderId);
+
+                       // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
+                       Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobManager(
+                               rmLeaderId,
+                               jmLeaderId,
+                               jmResourceId,
+                               jobMasterAddress,
+                               jobId);
+                       RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
+                       assertTrue(response instanceof 
JobMasterRegistrationSuccess);
+
+                       ArgumentCaptor<Runnable> heartbeatRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       verify(scheduledExecutor, times(2)).scheduleAtFixedRate(
+                               heartbeatRunnableCaptor.capture(),
+                               eq(0L),
+                               eq(heartbeatInterval),
+                               eq(TimeUnit.MILLISECONDS));
+
+                       List<Runnable> heartbeatRunnable = 
heartbeatRunnableCaptor.getAllValues();
+
+                       ArgumentCaptor<Runnable> timeoutRunnableCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       
verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), 
eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS));
+
+                       Runnable timeoutRunnable = 
timeoutRunnableCaptor.getValue();
+
+                       // run all the heartbeat requests
+                       for (Runnable runnable : heartbeatRunnable) {
+                               runnable.run();
+                       }
+
+                       verify(jobMasterGateway, 
times(1)).heartbeatFromResourceManager(eq(rmResourceId));
+
+                       // run the timeout runnable to simulate a heartbeat 
timeout
+                       timeoutRunnable.run();
+
+                       
verify(jobMasterGateway).disconnectResourceManager(eq(jmLeaderId), 
eq(rmLeaderId), any(TimeoutException.class));
+
+               } finally {
+                       rpcService.stopService();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 8b9b800..0b25e6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.jobmaster;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
@@ -32,6 +34,8 @@ import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -52,9 +56,8 @@ import java.util.concurrent.TimeoutException;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(BlobLibraryCacheManager.class)
@@ -144,5 +147,85 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testHeartbeatTimeoutWithResourceManager() throws Exception {
+               final String resourceManagerAddress = "rm";
+               final String jobManagerAddress = "jm";
+               final UUID rmLeaderId = UUID.randomUUID();
+               final UUID jmLeaderId = UUID.randomUUID();
+               final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
+               final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
+               final JobGraph jobGraph = new JobGraph();
+
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(
+                       null,
+                       null);
+               
haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
+               
haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class));
+
+               final long heartbeatInterval = 1L;
+               final long heartbeatTimeout = 5L;
+               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+               final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
+
+               final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+               when(resourceManagerGateway.registerJobManager(
+                       any(UUID.class),
+                       any(UUID.class),
+                       any(ResourceID.class),
+                       anyString(),
+                       any(JobID.class),
+                       any(Time.class)
+               
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JobMasterRegistrationSuccess(
+                       heartbeatInterval, rmLeaderId, rmResourceId)));
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               try {
+                       final JobMaster jobMaster = new JobMaster(
+                               rpc,
+                               jmResourceId,
+                               jobGraph,
+                               new Configuration(),
+                               haServices,
+                               heartbeatServices,
+                               Executors.newScheduledThreadPool(1),
+                               mock(BlobLibraryCacheManager.class),
+                               mock(RestartStrategyFactory.class),
+                               Time.of(10, TimeUnit.SECONDS),
+                               null,
+                               mock(OnCompletionActions.class),
+                               testingFatalErrorHandler,
+                               new FlinkUserCodeClassLoader(new URL[0]));
+
+                       jobMaster.start(jmLeaderId);
+
+                       // define a leader and see that a registration happens
+                       
rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId);
+
+                       // register job manager success will trigger monitor 
heartbeat target between jm and rm
+                       verify(resourceManagerGateway).registerJobManager(
+                               eq(rmLeaderId),
+                               eq(jmLeaderId),
+                               eq(jmResourceId),
+                               anyString(),
+                               eq(jobGraph.getJobID()),
+                               any(Time.class));
+
+                       // heartbeat timeout should trigger disconnect 
JobManager from ResourceManager
+                       verify(resourceManagerGateway, timeout(heartbeatTimeout 
* 50L)).disconnectJobManager(eq(jobGraph.getJobID()), 
any(TimeoutException.class));
+
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowError();
+
+               } finally {
+                       rpc.stopService();
+               }
+       }
+
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index b6b8614..6a151ac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -70,13 +70,19 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                UUID jmLeaderID = UUID.randomUUID();
+               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
                // test response successful
-               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderID, 
jobMasterAddress, jobID);
+               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobManager(
+                       rmLeaderSessionId,
+                       jmLeaderID,
+                       jmResourceId,
+                       jobMasterAddress,
+                       jobID);
                RegistrationResponse response = successfulFuture.get(5L, 
TimeUnit.SECONDS);
                assertTrue(response instanceof JobMasterRegistrationSuccess);
 
@@ -94,6 +100,7 @@ public class ResourceManagerJobMasterTest extends TestLogger 
{
                JobID jobID = mockJobMaster(jobMasterAddress);
                TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
                UUID jmLeaderID = UUID.randomUUID();
+               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
                TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
@@ -101,7 +108,12 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
 
                // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
                UUID differentLeaderSessionID = UUID.randomUUID();
-               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobManager(differentLeaderSessionID, jmLeaderID, 
jobMasterAddress, jobID);
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobManager(
+                       differentLeaderSessionID,
+                       jmLeaderID,
+                       jmResourceId,
+                       jobMasterAddress,
+                       jobID);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -124,10 +136,16 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
                final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
 
                // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
                UUID differentLeaderSessionID = UUID.randomUUID();
-               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobManager(rmLeaderSessionId, differentLeaderSessionID, 
jobMasterAddress, jobID);
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobManager(
+                       rmLeaderSessionId,
+                       differentLeaderSessionID,
+                       jmResourceId,
+                       jobMasterAddress,
+                       jobID);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -150,10 +168,16 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
                final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
 
                // test throw exception when receive a registration from job 
master which takes invalid address
                String invalidAddress = "/jobMasterAddress2";
-               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, 
invalidAddress, jobID);
+               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerJobManager(
+                       rmLeaderSessionId,
+                       jmLeaderSessionId,
+                       jmResourceId,
+                       invalidAddress,
+                       jobID);
                assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
 
                if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -176,10 +200,16 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final UUID rmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
                final UUID jmLeaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+               final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
 
                JobID unknownJobIDToHAServices = new JobID();
                // verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
-               Future<RegistrationResponse> declineFuture = 
resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, 
jobMasterAddress, unknownJobIDToHAServices);
+               Future<RegistrationResponse> declineFuture = 
resourceManager.registerJobManager(
+                       rmLeaderSessionId,
+                       jmLeaderSessionId,
+                       jmResourceId,
+                       jobMasterAddress,
+                       unknownJobIDToHAServices);
                RegistrationResponse response = declineFuture.get(5, 
TimeUnit.SECONDS);
                assertTrue(response instanceof RegistrationResponse.Decline);
 
@@ -205,7 +235,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
 
-               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
+               HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 
5L);
 
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 4d2309a..37690b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -101,6 +101,7 @@ public class SlotProtocolTest extends TestLogger {
                final String jmAddress = "/jm1";
                final JobID jobID = new JobID();
                final ResourceID rmResourceId = new ResourceID(rmAddress);
+               final ResourceID jmResourceId = new ResourceID(jmAddress);
 
                testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
 
@@ -138,7 +139,7 @@ public class SlotProtocolTest extends TestLogger {
                rmLeaderElectionService.isLeader(rmLeaderID);
 
                Future<RegistrationResponse> registrationFuture =
-                       resourceManager.registerJobManager(rmLeaderID, 
jmLeaderID, jmAddress, jobID);
+                       resourceManager.registerJobManager(rmLeaderID, 
jmLeaderID, jmResourceId, jmAddress, jobID);
                try {
                        registrationFuture.get(5, TimeUnit.SECONDS);
                } catch (Exception e) {
@@ -207,6 +208,7 @@ public class SlotProtocolTest extends TestLogger {
                final String tmAddress = "/tm1";
                final JobID jobID = new JobID();
                final ResourceID rmResourceId = new ResourceID(rmAddress);
+               final ResourceID jmResourceId = new ResourceID(jmAddress);
 
                testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
 
@@ -254,7 +256,7 @@ public class SlotProtocolTest extends TestLogger {
                Thread.sleep(1000);
 
                Future<RegistrationResponse> registrationFuture =
-                       resourceManager.registerJobManager(rmLeaderID, 
jmLeaderID, jmAddress, jobID);
+                       resourceManager.registerJobManager(rmLeaderID, 
jmLeaderID, jmResourceId, jmAddress, jobID);
                try {
                        registrationFuture.get(5L, TimeUnit.SECONDS);
                } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index fa326b5..579ca3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -90,6 +90,7 @@ public class TaskExecutorITCase extends TestLogger {
                final String jmAddress = "jm";
                final UUID jmLeaderId = UUID.randomUUID();
                final ResourceID rmResourceId = new ResourceID(rmAddress);
+               final ResourceID jmResourceId = new ResourceID(jmAddress);
                final JobID jobId = new JobID();
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
 
@@ -180,7 +181,12 @@ public class TaskExecutorITCase extends TestLogger {
                        // notify the TM about the new RM leader
                        rmLeaderRetrievalService.notifyListener(rmAddress, 
rmLeaderId);
 
-                       Future<RegistrationResponse> registrationResponseFuture 
= resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId);
+                       Future<RegistrationResponse> registrationResponseFuture 
= resourceManager.registerJobManager(
+                               rmLeaderId,
+                               jmLeaderId,
+                               jmResourceId,
+                               jmAddress,
+                               jobId);
 
                        RegistrationResponse registrationResponse = 
registrationResponseFuture.get();
 

Reply via email to