Repository: flink Updated Branches: refs/heads/master bb972b85a -> 81114d5f7
[FLINK-6120] [heartbeat] Implement heartbeat logic between JobManager and ResourceManager This closes #3645. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81114d5f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81114d5f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81114d5f Branch: refs/heads/master Commit: 81114d5f71ee05908a93ebd21475dcec1f5eed09 Parents: bb972b8 Author: Zhijiang <wangzhijiang...@aliyun.com> Authored: Thu Mar 30 00:30:29 2017 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Apr 27 17:56:51 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 153 +++++++++++++++---- .../runtime/jobmaster/JobMasterGateway.java | 7 + .../jobmaster/JobMasterRegistrationSuccess.java | 14 +- .../resourcemanager/ResourceManager.java | 133 +++++++++++++--- .../resourcemanager/ResourceManagerGateway.java | 18 ++- .../registration/JobManagerRegistration.java | 9 ++ .../clusterframework/ResourceManagerTest.java | 116 +++++++++++++- .../flink/runtime/jobmaster/JobMasterTest.java | 89 ++++++++++- .../ResourceManagerJobMasterTest.java | 42 ++++- .../slotmanager/SlotProtocolTest.java | 6 +- .../taskexecutor/TaskExecutorITCase.java | 8 +- 11 files changed, 521 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 6fe8cb3..ab43577 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -155,7 +155,10 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { private final MetricGroup jobMetricGroup; /** The heartbeat manager with task managers */ - private final HeartbeatManager<Void, Void> heartbeatManager; + private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager; + + /** The heartbeat manager with resource manager */ + private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager; /** The execution context which is used to execute futures */ private final Executor executor; @@ -218,12 +221,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { this.errorHandler = checkNotNull(errorHandler); this.userCodeLoader = checkNotNull(userCodeLoader); - this.heartbeatManager = heartbeatServices.createHeartbeatManagerSender( + this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), rpcService.getScheduledExecutor(), log); + this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager( + resourceId, + new ResourceManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); + final String jobName = jobGraph.getName(); final JobID jid = jobGraph.getJobID(); @@ -309,7 +318,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { */ @Override public void shutDown() throws Exception { - heartbeatManager.stop(); + taskManagerHeartbeatManager.stop(); + resourceManagerHeartbeatManager.stop(); // make sure there is a graceful exit getSelf().suspendExecution(new Exception("JobManager is shutting down.")); @@ -407,7 +417,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { slotPoolGateway.suspend(); // disconnect from resource manager: - closeResourceManagerConnection(); + closeResourceManagerConnection(new Exception("Execution was suspended.", cause)); } //---------------------------------------------------------------------------------------------- @@ -534,7 +544,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @RpcMethod public void disconnectTaskManager(final ResourceID resourceID, final Exception cause) { - heartbeatManager.unmonitorTarget(resourceID); + taskManagerHeartbeatManager.unmonitorTarget(resourceID); slotPoolGateway.releaseTaskManager(resourceID); Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID); @@ -766,7 +776,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway)); // monitor the task manager as heartbeat target - heartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() { + taskManagerHeartbeatManager.monitorTarget(taskManagerId, new HeartbeatTarget<Void>() { @Override public void receiveHeartbeat(ResourceID resourceID, Void payload) { // the task manager will not request heartbeat, so this method will never be called currently @@ -788,13 +798,24 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { public void disconnectResourceManager( final UUID jobManagerLeaderId, final UUID resourceManagerLeaderId, - final Exception cause) { - // TODO: Implement disconnect behaviour + final Exception cause) throws Exception { + + validateLeaderSessionId(jobManagerLeaderId); + + if (resourceManagerConnection != null + && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) { + closeResourceManagerConnection(cause); + } } @RpcMethod public void heartbeatFromTaskManager(final ResourceID resourceID) { - heartbeatManager.receiveHeartbeat(resourceID, null); + taskManagerHeartbeatManager.receiveHeartbeat(resourceID, null); + } + + @RpcMethod + public void heartbeatFromResourceManager(final ResourceID resourceID) { + resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null); } //---------------------------------------------------------------------------------------------- @@ -872,56 +893,79 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } } - private void notifyOfNewResourceManagerLeader( - final String resourceManagerAddress, final UUID resourceManagerLeaderId) - { - validateRunsInMainThread(); - + private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final UUID resourceManagerLeaderId) { if (resourceManagerConnection != null) { if (resourceManagerAddress != null) { if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) - && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { + && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { // both address and leader id are not changed, we can keep the old connection return; } + + closeResourceManagerConnection(new Exception( + "ResourceManager leader changed to new address " + resourceManagerAddress)); + log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", - resourceManagerConnection.getTargetAddress(), resourceManagerAddress); + resourceManagerConnection.getTargetAddress(), resourceManagerAddress); } else { log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", - resourceManagerConnection.getTargetAddress()); + resourceManagerConnection.getTargetAddress()); } } - closeResourceManagerConnection(); - if (resourceManagerAddress != null) { log.info("Attempting to register at ResourceManager {}", resourceManagerAddress); + resourceManagerConnection = new ResourceManagerConnection( - log, jobGraph.getJobID(), getAddress(), leaderSessionID, - resourceManagerAddress, resourceManagerLeaderId, executor); + log, + jobGraph.getJobID(), + resourceId, + getAddress(), + leaderSessionID, + resourceManagerAddress, + resourceManagerLeaderId, + executor); + resourceManagerConnection.start(); } } - private void onResourceManagerRegistrationSuccess(final JobMasterRegistrationSuccess success) { - validateRunsInMainThread(); + private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) { + final UUID resourceManagerLeaderId = success.getResourceManagerLeaderId(); // verify the response with current connection if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) - { - log.info("JobManager successfully registered at ResourceManager, leader id: {}.", - success.getResourceManagerLeaderId()); + && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) { - slotPoolGateway.connectToResourceManager( - success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway()); + log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerLeaderId); + + final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + + slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, resourceManagerGateway); + + resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() { + @Override + public void receiveHeartbeat(ResourceID resourceID, Void payload) { + resourceManagerGateway.heartbeatFromJobManager(resourceID); + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Void payload) { + // request heartbeat will never be called on the job manager side + } + }); } } - private void closeResourceManagerConnection() { - validateRunsInMainThread(); - + private void closeResourceManagerConnection(Exception cause) { if (resourceManagerConnection != null) { + log.info("Close ResourceManager connection {}.", resourceManagerConnection.getResourceManagerResourceID(), cause); + + resourceManagerHeartbeatManager.unmonitorTarget(resourceManagerConnection.getResourceManagerResourceID()); + + ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); + resourceManagerGateway.disconnectJobManager(resourceManagerConnection.getJobID(), cause); + resourceManagerConnection.close(); resourceManagerConnection = null; } @@ -964,13 +1008,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { { private final JobID jobID; + private final ResourceID jobManagerResourceID; + private final String jobManagerRpcAddress; private final UUID jobManagerLeaderID; + private ResourceID resourceManagerResourceID; + ResourceManagerConnection( final Logger log, final JobID jobID, + final ResourceID jobManagerResourceID, final String jobManagerRpcAddress, final UUID jobManagerLeaderID, final String resourceManagerAddress, @@ -979,6 +1028,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { { super(log, resourceManagerAddress, resourceManagerLeaderID, executor); this.jobID = checkNotNull(jobID); + this.jobManagerResourceID = checkNotNull(jobManagerResourceID); this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress); this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID); } @@ -998,6 +1048,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return gateway.registerJobManager( leaderId, jobManagerLeaderID, + jobManagerResourceID, jobManagerRpcAddress, jobID, timeout); @@ -1010,7 +1061,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { runAsync(new Runnable() { @Override public void run() { - onResourceManagerRegistrationSuccess(success); + resourceManagerResourceID = success.getResourceManagerResourceId(); + establishResourceManagerConnection(success); } }); } @@ -1019,6 +1071,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { protected void onRegistrationFailure(final Throwable failure) { handleFatalError(failure); } + + public ResourceID getResourceManagerResourceID() { + return resourceManagerResourceID; + } + + public JobID getJobID() { + return jobID; + } } //---------------------------------------------------------------------------------------------- @@ -1063,4 +1123,31 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { return FlinkCompletableFuture.completed(null); } } + + private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + + @Override + public void notifyHeartbeatTimeout(final ResourceID resourceId) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("The heartbeat of ResourceManager with id {} timed out.", resourceId); + + closeResourceManagerConnection( + new TimeoutException( + "The heartbeat of ResourceManager with id " + resourceId + " timed out.")); + } + }); + } + + @Override + public void reportPayload(ResourceID resourceID, Void payload) { + // nothing to do since the payload is of type Void + } + + @Override + public Future<Void> retrievePayload() { + return FlinkCompletableFuture.completed(null); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index 13a7372..5a271f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -226,4 +226,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * @param resourceID unique id of the task manager */ void heartbeatFromTaskManager(final ResourceID resourceID); + + /** + * Heartbeat request from the resource manager + * + * @param resourceID unique id of the resource manager + */ + void heartbeatFromResourceManager(final ResourceID resourceID); } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java index 4058452..a7a6224 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.registration.RegistrationResponse; import java.util.UUID; @@ -35,9 +36,15 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { private final UUID resourceManagerLeaderId; - public JobMasterRegistrationSuccess(final long heartbeatInterval, final UUID resourceManagerLeaderId) { + private final ResourceID resourceManagerResourceId; + + public JobMasterRegistrationSuccess( + final long heartbeatInterval, + final UUID resourceManagerLeaderId, + final ResourceID resourceManagerResourceId) { this.heartbeatInterval = heartbeatInterval; this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + this.resourceManagerResourceId = checkNotNull(resourceManagerResourceId); } /** @@ -53,11 +60,16 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { return resourceManagerLeaderId; } + public ResourceID getResourceManagerResourceId() { + return resourceManagerResourceId; + } + @Override public String toString() { return "JobMasterRegistrationSuccess{" + "heartbeatInterval=" + heartbeatInterval + ", resourceManagerLeaderId=" + resourceManagerLeaderId + + ", resourceManagerResourceId=" + resourceManagerResourceId + '}'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index c0ff412..f17dbe5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -79,7 +79,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * It offers the following methods as part of its rpc interface to interact with him remotely: * <ul> - * <li>{@link #registerJobManager(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> + * <li>{@link #registerJobManager(UUID, UUID, String, JobID, ResourceID)} registers a {@link JobMaster} at the resource manager</li> * <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li> * </ul> */ @@ -96,6 +96,9 @@ public abstract class ResourceManager<WorkerType extends Serializable> /** All currently registered JobMasterGateways scoped by JobID. */ private final Map<JobID, JobManagerRegistration> jobManagerRegistrations; + /** All currently registered JobMasterGateways scoped by ResourceID. */ + private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations; + /** Service to retrieve the job leader ids */ private final JobLeaderIdService jobLeaderIdService; @@ -108,6 +111,9 @@ public abstract class ResourceManager<WorkerType extends Serializable> /** The heartbeat manager with task managers. */ private final HeartbeatManager<Void, Void> taskManagerHeartbeatManager; + /** The heartbeat manager with job managers. */ + private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager; + /** The factory to construct the SlotManager. */ private final SlotManagerFactory slotManagerFactory; @@ -152,12 +158,19 @@ public abstract class ResourceManager<WorkerType extends Serializable> this.fatalErrorHandler = checkNotNull(fatalErrorHandler); this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( - resourceId, - new TaskManagerHeartbeatListener(), - rpcService.getScheduledExecutor(), - log); + resourceId, + new TaskManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); + + this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender( + resourceId, + new JobManagerHeartbeatListener(), + rpcService.getScheduledExecutor(), + log); this.jobManagerRegistrations = new HashMap<>(4); + this.jmResourceIdRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); this.leaderSessionId = null; infoMessageListeners = new ConcurrentHashMap<>(8); @@ -204,6 +217,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> taskManagerHeartbeatManager.stop(); + jobManagerHeartbeatManager.stop(); + try { super.shutDown(); } catch (Exception e) { @@ -231,11 +246,13 @@ public abstract class ResourceManager<WorkerType extends Serializable> public Future<RegistrationResponse> registerJobManager( final UUID resourceManagerLeaderId, final UUID jobManagerLeaderId, + final ResourceID jobManagerResourceId, final String jobManagerAddress, final JobID jobId) { checkNotNull(resourceManagerLeaderId); checkNotNull(jobManagerLeaderId); + checkNotNull(jobManagerResourceId); checkNotNull(jobManagerAddress); checkNotNull(jobId); @@ -276,7 +293,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(jobLeaderIdFuture, new BiFunction<JobMasterGateway, UUID, RegistrationResponse>() { @Override - public RegistrationResponse apply(JobMasterGateway jobMasterGateway, UUID jobLeaderId) { + public RegistrationResponse apply(final JobMasterGateway jobMasterGateway, UUID jobLeaderId) { if (isValid(resourceManagerLeaderId)) { if (jobLeaderId.equals(jobManagerLeaderId)) { if (jobManagerRegistrations.containsKey(jobId)) { @@ -291,21 +308,43 @@ public abstract class ResourceManager<WorkerType extends Serializable> oldJobManagerRegistration.getJobID(), new Exception("New job leader for job " + jobId + " found.")); - JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway); + JobManagerRegistration jobManagerRegistration = new JobManagerRegistration( + jobId, + jobManagerResourceId, + jobLeaderId, + jobMasterGateway); jobManagerRegistrations.put(jobId, jobManagerRegistration); + jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration); } } else { // new registration for the job - JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway); - + JobManagerRegistration jobManagerRegistration = new JobManagerRegistration( + jobId, + jobManagerResourceId, + jobLeaderId, + jobMasterGateway); jobManagerRegistrations.put(jobId, jobManagerRegistration); + jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration); } log.info("Registered job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() { + @Override + public void receiveHeartbeat(ResourceID resourceID, Void payload) { + // the ResourceManager will always send heartbeat requests to the JobManager + } + + @Override + public void requestHeartbeat(ResourceID resourceID, Void payload) { + jobMasterGateway.heartbeatFromResourceManager(resourceID); + } + }); + return new JobMasterRegistrationSuccess( resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), - getLeaderSessionId()); + getLeaderSessionId(), + resourceId); } else { log.debug("The job manager leader id {} did not match the job " + @@ -423,10 +462,20 @@ public abstract class ResourceManager<WorkerType extends Serializable> } @RpcMethod + public void heartbeatFromJobManager(final ResourceID resourceID) { + jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null); + } + + @RpcMethod public void disconnectTaskManager(final ResourceID resourceId, final Exception cause) { closeTaskManagerConnection(resourceId, cause); } + @RpcMethod + public void disconnectJobManager(final JobID jobId, final Exception cause) { + closeJobManagerConnection(jobId, cause); + } + /** * Requests a slot from the resource manager. * @@ -577,6 +626,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> private void clearState() { jobManagerRegistrations.clear(); + jmResourceIdRegistrations.clear(); taskExecutors.clear(); slotManager.clearState(); @@ -590,23 +640,31 @@ public abstract class ResourceManager<WorkerType extends Serializable> } /** - * Disconnects the job manager which is connected for the given job from the resource manager. + * This method should be called by the framework once it detects that a currently registered + * job manager has failed. * - * @param jobId identifying the job whose leader shall be disconnected + * @param jobId identifying the job whose leader shall be disconnected. + * @param cause The exception which cause the JobManager failed. */ - protected void disconnectJobManager(JobID jobId, Exception cause) { + protected void closeJobManagerConnection(JobID jobId, Exception cause) { JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId); if (jobManagerRegistration != null) { + final ResourceID jobManagerResourceId = jobManagerRegistration.getJobManagerResourceID(); + final JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + final UUID jobManagerLeaderId = jobManagerRegistration.getLeaderID(); + log.info("Disconnect job manager {}@{} for job {} from the resource manager.", - jobManagerRegistration.getLeaderID(), - jobManagerRegistration.getJobManagerGateway().getAddress(), + jobManagerLeaderId, + jobMasterGateway.getAddress(), jobId); - JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway(); + jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceId); + + jmResourceIdRegistrations.remove(jobManagerResourceId); // tell the job manager about the disconnect - jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause); + jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, getLeaderSessionId(), cause); } else { log.debug("There was no registered job manager for job {}.", jobId); } @@ -882,14 +940,47 @@ public abstract class ResourceManager<WorkerType extends Serializable> @Override public void notifyHeartbeatTimeout(final ResourceID resourceID) { - log.info("The heartbeat of TaskManager with id {} timed out.", resourceID); - runAsync(new Runnable() { @Override public void run() { + log.info("The heartbeat of TaskManager with id {} timed out.", resourceID); + closeTaskManagerConnection( - resourceID, - new TimeoutException("Task manager with id " + resourceID + " heartbeat timed out.")); + resourceID, + new TimeoutException("The heartbeat of TaskManager with id " + resourceID + " timed out.")); + } + }); + } + + @Override + public void reportPayload(ResourceID resourceID, Void payload) { + // nothing to do since there is no payload + } + + @Override + public Future<Void> retrievePayload() { + return FlinkCompletableFuture.completed(null); + } + } + + private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> { + + @Override + public void notifyHeartbeatTimeout(final ResourceID resourceID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("The heartbeat of JobManager with id {} timed out.", resourceID); + + if (jmResourceIdRegistrations.containsKey(resourceID)) { + JobManagerRegistration jobManagerRegistration = jmResourceIdRegistrations.get(resourceID); + + if (jobManagerRegistration != null) { + closeJobManagerConnection( + jobManagerRegistration.getJobID(), + new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out.")); + } + } } }); } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index cda4a7c..530113f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -44,6 +44,7 @@ public interface ResourceManagerGateway extends RpcGateway { * * @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param jobMasterLeaderId The fencing token for the JobMaster leader + * @param jobMasterResourceId The resource ID of the JobMaster that registers * @param jobMasterAddress The address of the JobMaster that registers * @param jobID The Job ID of the JobMaster that registers * @param timeout Timeout for the future to complete @@ -52,11 +53,11 @@ public interface ResourceManagerGateway extends RpcGateway { Future<RegistrationResponse> registerJobManager( UUID resourceManagerLeaderId, UUID jobMasterLeaderId, + ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobID, @RpcTimeout Time timeout); - /** * Requests a slot from the resource manager. * @@ -139,10 +140,25 @@ public interface ResourceManagerGateway extends RpcGateway { void heartbeatFromTaskManager(final ResourceID heartbeatOrigin); /** + * Sends the heartbeat to resource manager from job manager + * + * @param heartbeatOrigin unique id of the job manager + */ + void heartbeatFromJobManager(final ResourceID heartbeatOrigin); + + /** * Disconnects a TaskManager specified by the given resourceID from the {@link ResourceManager}. * * @param resourceID identifying the TaskManager to disconnect * @param cause for the disconnection of the TaskManager */ void disconnectTaskManager(ResourceID resourceID, Exception cause); + + /** + * Disconnects a JobManager specified by the given resourceID from the {@link ResourceManager}. + * + * @param jobId JobID for which the JobManager was the leader + * @param cause for the disconnection of the JobManager + */ + void disconnectJobManager(JobID jobId, Exception cause); } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java index a1deb65..df3a39f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager.registration; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.util.Preconditions; @@ -30,15 +31,19 @@ import java.util.UUID; public class JobManagerRegistration { private final JobID jobID; + private final ResourceID jobManagerResourceID; + private final UUID leaderID; private final JobMasterGateway jobManagerGateway; public JobManagerRegistration( JobID jobID, + ResourceID jobManagerResourceID, UUID leaderID, JobMasterGateway jobManagerGateway) { this.jobID = Preconditions.checkNotNull(jobID); + this.jobManagerResourceID = Preconditions.checkNotNull(jobManagerResourceID); this.leaderID = Preconditions.checkNotNull(leaderID); this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway); } @@ -47,6 +52,10 @@ public class JobManagerRegistration { return jobID; } + public ResourceID getJobManagerResourceID() { + return jobManagerResourceID; + } + public UUID getLeaderID() { return leaderID; } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 3464129..e4e20b9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -37,7 +38,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistry; @@ -412,27 +416,32 @@ public class ResourceManagerTest extends TestLogger { final SlotReport slotReport = new SlotReport(); // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time - Future<RegistrationResponse> successfulFuture = - resourceManager.registerTaskExecutor(rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport); + Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor( + rmLeaderSessionId, + taskManagerAddress, + taskManagerResourceID, + slotReport); RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); - verify(scheduledExecutor, times(1)).scheduleAtFixedRate( + verify(scheduledExecutor, times(2)).scheduleAtFixedRate( heartbeatRunnableCaptor.capture(), eq(0L), eq(heartbeatInterval), eq(TimeUnit.MILLISECONDS)); - Runnable heartbeatRunnable = heartbeatRunnableCaptor.getValue(); + List<Runnable> heartbeatRunnable = heartbeatRunnableCaptor.getAllValues(); ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); Runnable timeoutRunnable = timeoutRunnableCaptor.getValue(); - // run the first heartbeat request - heartbeatRunnable.run(); + // run all the heartbeat requests + for (Runnable runnable : heartbeatRunnable) { + runnable.run(); + } verify(taskExecutorGateway, times(1)).heartbeatFromResourceManager(eq(resourceManagerResourceID)); @@ -445,4 +454,99 @@ public class ResourceManagerTest extends TestLogger { rpcService.stopService(); } } + + @Test + public void testHeartbeatTimeoutWithJobManager() throws Exception { + final String jobMasterAddress = "jm"; + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); + final ResourceID rmResourceId = ResourceID.generate(); + final UUID rmLeaderId = UUID.randomUUID(); + final UUID jmLeaderId = UUID.randomUUID(); + final JobID jobId = new JobID(); + + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + + final TestingSerialRpcService rpcService = new TestingSerialRpcService(); + rpcService.registerGateway(jobMasterAddress, jobMasterGateway); + + final ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( + Time.seconds(5L), + Time.seconds(5L)); + + final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderId); + final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService); + + final long heartbeatInterval = 1L; + final long heartbeatTimeout = 5L; + final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); + + final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); + final MetricRegistry metricRegistry = mock(MetricRegistry.class); + final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( + highAvailabilityServices, + rpcService.getScheduledExecutor(), + Time.minutes(5L)); + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + try { + final StandaloneResourceManager resourceManager = new StandaloneResourceManager( + rpcService, + FlinkResourceManager.RESOURCE_MANAGER_NAME, + rmResourceId, + resourceManagerConfiguration, + highAvailabilityServices, + heartbeatServices, + slotManagerFactory, + metricRegistry, + jobLeaderIdService, + testingFatalErrorHandler); + + resourceManager.start(); + + rmLeaderElectionService.isLeader(rmLeaderId); + + // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time + Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager( + rmLeaderId, + jmLeaderId, + jmResourceId, + jobMasterAddress, + jobId); + RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + assertTrue(response instanceof JobMasterRegistrationSuccess); + + ArgumentCaptor<Runnable> heartbeatRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor, times(2)).scheduleAtFixedRate( + heartbeatRunnableCaptor.capture(), + eq(0L), + eq(heartbeatInterval), + eq(TimeUnit.MILLISECONDS)); + + List<Runnable> heartbeatRunnable = heartbeatRunnableCaptor.getAllValues(); + + ArgumentCaptor<Runnable> timeoutRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor).schedule(timeoutRunnableCaptor.capture(), eq(heartbeatTimeout), eq(TimeUnit.MILLISECONDS)); + + Runnable timeoutRunnable = timeoutRunnableCaptor.getValue(); + + // run all the heartbeat requests + for (Runnable runnable : heartbeatRunnable) { + runnable.run(); + } + + verify(jobMasterGateway, times(1)).heartbeatFromResourceManager(eq(rmResourceId)); + + // run the timeout runnable to simulate a heartbeat timeout + timeoutRunnable.run(); + + verify(jobMasterGateway).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class)); + + } finally { + rpcService.stopService(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 8b9b800..0b25e6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -18,11 +18,13 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; @@ -32,6 +34,8 @@ import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -52,9 +56,8 @@ import java.util.concurrent.TimeoutException; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest(BlobLibraryCacheManager.class) @@ -144,5 +147,85 @@ public class JobMasterTest extends TestLogger { } } + @Test + public void testHeartbeatTimeoutWithResourceManager() throws Exception { + final String resourceManagerAddress = "rm"; + final String jobManagerAddress = "jm"; + final UUID rmLeaderId = UUID.randomUUID(); + final UUID jmLeaderId = UUID.randomUUID(); + final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); + final ResourceID jmResourceId = new ResourceID(jobManagerAddress); + final JobGraph jobGraph = new JobGraph(); + + final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService( + null, + null); + haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); + haServices.setCheckpointRecoveryFactory(mock(CheckpointRecoveryFactory.class)); + + final long heartbeatInterval = 1L; + final long heartbeatTimeout = 5L; + final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); + + final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); + when(resourceManagerGateway.registerJobManager( + any(UUID.class), + any(UUID.class), + any(ResourceID.class), + anyString(), + any(JobID.class), + any(Time.class) + )).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JobMasterRegistrationSuccess( + heartbeatInterval, rmLeaderId, rmResourceId))); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); + + final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + + try { + final JobMaster jobMaster = new JobMaster( + rpc, + jmResourceId, + jobGraph, + new Configuration(), + haServices, + heartbeatServices, + Executors.newScheduledThreadPool(1), + mock(BlobLibraryCacheManager.class), + mock(RestartStrategyFactory.class), + Time.of(10, TimeUnit.SECONDS), + null, + mock(OnCompletionActions.class), + testingFatalErrorHandler, + new FlinkUserCodeClassLoader(new URL[0])); + + jobMaster.start(jmLeaderId); + + // define a leader and see that a registration happens + rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId); + + // register job manager success will trigger monitor heartbeat target between jm and rm + verify(resourceManagerGateway).registerJobManager( + eq(rmLeaderId), + eq(jmLeaderId), + eq(jmResourceId), + anyString(), + eq(jobGraph.getJobID()), + any(Time.class)); + + // heartbeat timeout should trigger disconnect JobManager from ResourceManager + verify(resourceManagerGateway, timeout(heartbeatTimeout * 50L)).disconnectJobManager(eq(jobGraph.getJobID()), any(TimeoutException.class)); + + // check if a concurrent error occurred + testingFatalErrorHandler.rethrowError(); + + } finally { + rpc.stopService(); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index b6b8614..6a151ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -70,13 +70,19 @@ public class ResourceManagerJobMasterTest extends TestLogger { JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); UUID jmLeaderID = UUID.randomUUID(); + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); // test response successful - Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID); + Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager( + rmLeaderSessionId, + jmLeaderID, + jmResourceId, + jobMasterAddress, + jobID); RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS); assertTrue(response instanceof JobMasterRegistrationSuccess); @@ -94,6 +100,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { JobID jobID = mockJobMaster(jobMasterAddress); TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); UUID jmLeaderID = UUID.randomUUID(); + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); @@ -101,7 +108,12 @@ public class ResourceManagerJobMasterTest extends TestLogger { // test throw exception when receive a registration from job master which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); - Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager( + differentLeaderSessionID, + jmLeaderID, + jmResourceId, + jobMasterAddress, + jobID); assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -124,10 +136,16 @@ public class ResourceManagerJobMasterTest extends TestLogger { final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); // test throw exception when receive a registration from job master which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); - Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager( + rmLeaderSessionId, + differentLeaderSessionID, + jmResourceId, + jobMasterAddress, + jobID); assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -150,10 +168,16 @@ public class ResourceManagerJobMasterTest extends TestLogger { final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); // test throw exception when receive a registration from job master which takes invalid address String invalidAddress = "/jobMasterAddress2"; - Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID); + Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager( + rmLeaderSessionId, + jmLeaderSessionId, + jmResourceId, + invalidAddress, + jobID); assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); if (testingFatalErrorHandler.hasExceptionOccurred()) { @@ -176,10 +200,16 @@ public class ResourceManagerJobMasterTest extends TestLogger { final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + final ResourceID jmResourceId = new ResourceID(jobMasterAddress); JobID unknownJobIDToHAServices = new JobID(); // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener - Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); + Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager( + rmLeaderSessionId, + jmLeaderSessionId, + jmResourceId, + jobMasterAddress, + unknownJobIDToHAServices); RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof RegistrationResponse.Decline); @@ -205,7 +235,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); - HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); + HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 4d2309a..37690b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -101,6 +101,7 @@ public class SlotProtocolTest extends TestLogger { final String jmAddress = "/jm1"; final JobID jobID = new JobID(); final ResourceID rmResourceId = new ResourceID(rmAddress); + final ResourceID jmResourceId = new ResourceID(jmAddress); testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); @@ -138,7 +139,7 @@ public class SlotProtocolTest extends TestLogger { rmLeaderElectionService.isLeader(rmLeaderID); Future<RegistrationResponse> registrationFuture = - resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmAddress, jobID); + resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID); try { registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { @@ -207,6 +208,7 @@ public class SlotProtocolTest extends TestLogger { final String tmAddress = "/tm1"; final JobID jobID = new JobID(); final ResourceID rmResourceId = new ResourceID(rmAddress); + final ResourceID jmResourceId = new ResourceID(jmAddress); testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); @@ -254,7 +256,7 @@ public class SlotProtocolTest extends TestLogger { Thread.sleep(1000); Future<RegistrationResponse> registrationFuture = - resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmAddress, jobID); + resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID); try { registrationFuture.get(5L, TimeUnit.SECONDS); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/flink/blob/81114d5f/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index fa326b5..579ca3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -90,6 +90,7 @@ public class TaskExecutorITCase extends TestLogger { final String jmAddress = "jm"; final UUID jmLeaderId = UUID.randomUUID(); final ResourceID rmResourceId = new ResourceID(rmAddress); + final ResourceID jmResourceId = new ResourceID(jmAddress); final JobID jobId = new JobID(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); @@ -180,7 +181,12 @@ public class TaskExecutorITCase extends TestLogger { // notify the TM about the new RM leader rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId); - Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager(rmLeaderId, jmLeaderId, jmAddress, jobId); + Future<RegistrationResponse> registrationResponseFuture = resourceManager.registerJobManager( + rmLeaderId, + jmLeaderId, + jmResourceId, + jmAddress, + jobId); RegistrationResponse registrationResponse = registrationResponseFuture.get();