[FLINK-4537] [cluster management] ResourceManager registration with JobManager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/efc7de5b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/efc7de5b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/efc7de5b Branch: refs/heads/flip-6 Commit: efc7de5bd3bff0512c20485f94d563c9e9cea5ec Parents: f4dc474 Author: beyond1920 <beyond1...@126.com> Authored: Thu Sep 1 15:27:20 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:40 2016 +0200 ---------------------------------------------------------------------- .../HighAvailabilityServices.java | 9 ++ .../runtime/highavailability/NonHaServices.java | 19 +++ .../jobmaster/JobMasterRegistrationSuccess.java | 49 ++++++ .../resourcemanager/JobMasterRegistration.java | 39 ++++- .../resourcemanager/ResourceManager.java | 125 +++++++++++++-- .../resourcemanager/ResourceManagerGateway.java | 34 ++-- .../exceptions/LeaderSessionIDException.java | 60 +++++++ .../runtime/taskexecutor/TaskExecutor.java | 5 + .../TestingHighAvailabilityServices.java | 17 ++ .../resourcemanager/ResourceManagerTest.java | 160 +++++++++++++++++++ 10 files changed, 483 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 298147c..7634176 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -40,6 +40,15 @@ public interface HighAvailabilityServices { LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; /** + * Gets the leader retriever for the job JobMaster which is responsible for the given job + * + * @param jobID The identifier of the job. + * @return + * @throws Exception + */ + LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception; + + /** * Gets the leader election service for the cluster's resource manager. * @return * @throws Exception http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index 292a404..33dc2d7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -42,6 +43,8 @@ public class NonHaServices implements HighAvailabilityServices { /** The fix address of the ResourceManager */ private final String resourceManagerAddress; + private final ConcurrentHashMap<JobID, String> jobMastersAddress; + /** * Creates a new services class for the fix pre-defined leaders. * @@ -49,6 +52,17 @@ public class NonHaServices implements HighAvailabilityServices { */ public NonHaServices(String resourceManagerAddress) { this.resourceManagerAddress = checkNotNull(resourceManagerAddress); + this.jobMastersAddress = new ConcurrentHashMap<>(16); + } + + /** + * Binds address of a specified job master + * + * @param jobID JobID for the specified job master + * @param jobMasterAddress address for the specified job master + */ + public void bindJobMasterLeaderAddress(JobID jobID, String jobMasterAddress) { + jobMastersAddress.put(jobID, jobMasterAddress); } // ------------------------------------------------------------------------ @@ -61,6 +75,11 @@ public class NonHaServices implements HighAvailabilityServices { } @Override + public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + return new StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0)); + } + + @Override public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { return new StandaloneLeaderElectionService(); } http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java new file mode 100644 index 0000000..031c38e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.runtime.registration.RegistrationResponse; + +/** + * Base class for responses from the ResourceManager to a registration attempt by a JobMaster. + */ +public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { + + private static final long serialVersionUID = 5577641250204140415L; + + private final long heartbeatInterval; + + public JobMasterRegistrationSuccess(long heartbeatInterval) { + this.heartbeatInterval = heartbeatInterval; + } + + /** + * Gets the interval in which the ResourceManager will heartbeat the JobMaster. + * + * @return the interval in which the ResourceManager will heartbeat the JobMaster + */ + public long getHeartbeatInterval() { + return heartbeatInterval; + } + + @Override + public String toString() { + return "JobMasterRegistrationSuccess(" + heartbeatInterval + ')'; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java index 439e56b..7b8ec70 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java @@ -18,23 +18,56 @@ package org.apache.flink.runtime.resourcemanager; +<<<<<<< HEAD import org.apache.flink.api.common.JobID; +======= +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +>>>>>>> db98efb... rsourceManager registration with JobManager import java.io.Serializable; +import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master + */ public class JobMasterRegistration implements Serializable { - private static final long serialVersionUID = 8411214999193765202L; +<<<<<<< HEAD private final String address; private final JobID jobID; public JobMasterRegistration(String address, JobID jobID) { this.address = address; this.jobID = jobID; +======= + private static final long serialVersionUID = -2316627821716999527L; + + private final JobMasterGateway jobMasterGateway; + + private UUID jobMasterLeaderSessionID; + + public JobMasterRegistration(JobMasterGateway jobMasterGateway) { + this.jobMasterGateway = checkNotNull(jobMasterGateway); + } + + public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID jobMasterLeaderSessionID) { + this.jobMasterGateway = checkNotNull(jobMasterGateway); + this.jobMasterLeaderSessionID = jobMasterLeaderSessionID; + } + + public JobMasterGateway getJobMasterGateway() { + return jobMasterGateway; + } + + public void setJobMasterLeaderSessionID(UUID leaderSessionID) { + this.jobMasterLeaderSessionID = jobMasterLeaderSessionID; +>>>>>>> db98efb... rsourceManager registration with JobManager } - public String getAddress() { - return address; + public UUID getJobMasterLeaderSessionID() { + return jobMasterLeaderSessionID; } public JobID getJobID() { http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 5370710..8be1455 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.resourcemanager; +import akka.dispatch.Futures; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -25,15 +26,22 @@ import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +<<<<<<< HEAD import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +======= +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +>>>>>>> db98efb... rsourceManager registration with JobManager import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,15 +58,21 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * It offers the following methods as part of its rpc interface to interact with the him remotely: * <ul> - * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li> + * <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> * </ul> */ +<<<<<<< HEAD public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender { private final Logger LOG = LoggerFactory.getLogger(getClass()); private final Map<JobID, JobMasterGateway> jobMasterGateways; +======= +public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { + /** the mapping relationship of JobID and JobMasterGateway */ + private final Map<JobID, JobMasterRegistration> jobMasters; +>>>>>>> db98efb... rsourceManager registration with JobManager private final HighAvailabilityServices highAvailabilityServices; @@ -74,8 +88,12 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme SlotManager slotManager) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); +<<<<<<< HEAD this.jobMasterGateways = new HashMap<>(); this.slotManager = slotManager; +======= + this.jobMasters = new HashMap<>(16); +>>>>>>> db98efb... rsourceManager registration with JobManager } @Override @@ -95,8 +113,11 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme public void shutDown() { try { leaderElectionService.stop(); + for(JobID jobID : jobMasters.keySet()) { + highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); + } super.shutDown(); - } catch(Throwable e) { + } catch (Throwable e) { log.error("A fatal error happened when shutdown the ResourceManager", e); throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e); } @@ -115,24 +136,58 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme /** * Register a {@link JobMaster} at the resource manager. * - * @param jobMasterRegistration Job master registration information + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param jobMasterAddress The address of the JobMaster that registers + * @param jobID The Job ID of the JobMaster that registers * @return Future registration response */ @RpcMethod +<<<<<<< HEAD public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) { final Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); final JobID jobID = jobMasterRegistration.getJobID(); +======= + public Future<RegistrationResponse> registerJobMaster(UUID resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) { + + if(!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId); + return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); + } + + Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterAddress, JobMasterGateway.class); +>>>>>>> db98efb... rsourceManager registration with JobManager return jobMasterFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() { @Override public RegistrationResponse apply(JobMasterGateway jobMasterGateway) { +<<<<<<< HEAD final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); if (existingGateway != null) { LOG.info("Replacing existing gateway {} for JobID {} with {}.", existingGateway, jobID, jobMasterGateway); } return new RegistrationResponse(true); +======= + if (jobMasters.containsKey(jobID)) { + JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, jobMasters.get(jobID).getJobMasterLeaderSessionID()); + jobMasters.put(jobID, jobMasterRegistration); + log.info("Replacing gateway for registered JobID {}.", jobID); + } else { + JobMasterRegistration jobMasterRegistration = new JobMasterRegistration(jobMasterGateway); + jobMasters.put(jobID, jobMasterRegistration); + try { + highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new JobMasterLeaderListener(jobID)); + } catch(Throwable e) { + log.warn("Decline registration from JobMaster {} at ({}) because fail to get the leader retriever for the given job JobMaster", + jobID, jobMasterAddress); + return new RegistrationResponse.Decline("Fail to get the leader retriever for the given JobMaster"); + } + } + + return new JobMasterRegistrationSuccess(5000); +>>>>>>> db98efb... rsourceManager registration with JobManager } }, getMainThreadExecutor()); } @@ -158,26 +213,41 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme /** - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers * @return The response by the ResourceManager. */ @RpcMethod - public org.apache.flink.runtime.registration.RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public RegistrationResponse registerTaskExecutor( + UUID resourceManagerLeaderId, + String taskExecutorAddress, + ResourceID resourceID) { return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } +<<<<<<< HEAD // ------------------------------------------------------------------------ // Leader Contender // ------------------------------------------------------------------------ +======= + /** + * Callback method when current resourceManager lose leadership. + */ + @Override + public void revokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasters.clear(); + leaderSessionID = null; + } + }); + } +>>>>>>> db98efb... rsourceManager registration with JobManager /** * Callback method when current resourceManager is granted leadership @@ -232,4 +302,35 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme } }); } + + private class JobMasterLeaderListener implements LeaderRetrievalListener { + private final JobID jobID; + + private JobMasterLeaderListener(JobID jobID) { + this.jobID = jobID; + } + + @Override + public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("A new leader for JobMaster {} is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, leaderSessionID); + // update job master leader session id + JobMasterRegistration jobMasterRegistration = jobMasters.get(jobID); + jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID); + } + }); + } + + @Override + public void handleError(final Exception exception) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("JobMasterLeaderListener received an error from the LeaderRetrievalService", exception); + } + }); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 5c8786c..1ee11a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -18,12 +18,13 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.registration.RegistrationResponse; import java.util.UUID; @@ -35,21 +36,18 @@ public interface ResourceManagerGateway extends RpcGateway { /** * Register a {@link JobMaster} at the resource manager. * - * @param jobMasterRegistration Job master registration information - * @param timeout Timeout for the future to complete + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param jobMasterAddress The address of the JobMaster that registers + * @param jobID The Job ID of the JobMaster that registers + * @param timeout Timeout for the future to complete * @return Future registration response */ Future<RegistrationResponse> registerJobMaster( - JobMasterRegistration jobMasterRegistration, - @RpcTimeout Time timeout); + UUID resourceManagerLeaderId, + String jobMasterAddress, + JobID jobID, + @RpcTimeout Time timeout); - /** - * Register a {@link JobMaster} at the resource manager. - * - * @param jobMasterRegistration Job master registration information - * @return Future registration response - */ - Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration); /** * Requests a slot from the resource manager. @@ -60,15 +58,13 @@ public interface ResourceManagerGateway extends RpcGateway { Future<SlotRequestReply> requestSlot(SlotRequest slotRequest); /** - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * @param timeout The timeout for the response. - * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers + * @param timeout The timeout for the response. * @return The future to the response by the ResourceManager. */ - Future<org.apache.flink.runtime.registration.RegistrationResponse> registerTaskExecutor( + Future<RegistrationResponse> registerTaskExecutor( UUID resourceManagerLeaderId, String taskExecutorAddress, ResourceID resourceID, http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java new file mode 100644 index 0000000..cd14a0d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.exceptions; + +import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * An exception specifying that the received leader session ID is not the same as expected. + */ +public class LeaderSessionIDException extends Exception { + + private static final long serialVersionUID = -3276145308053264636L; + + /** expected leader session id */ + private final UUID expectedLeaderSessionID; + + /** actual leader session id */ + private final UUID actualLeaderSessionID; + + public LeaderSessionIDException(UUID expectedLeaderSessionID, UUID actualLeaderSessionID) { + super("Unmatched leader session ID : expected " + expectedLeaderSessionID + ", actual " + actualLeaderSessionID); + this.expectedLeaderSessionID = checkNotNull(expectedLeaderSessionID); + this.actualLeaderSessionID = checkNotNull(actualLeaderSessionID); + } + + /** + * Get expected leader session id + * + * @return expect leader session id + */ + public UUID getExpectedLeaderSessionID() { + return expectedLeaderSessionID; + } + + /** + * Get actual leader session id + * + * @return actual leader session id + */ + public UUID getActualLeaderSessionID() { + return actualLeaderSessionID; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d84a6a9..cf709c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -327,6 +327,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } @Override + public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + return null; + } + + @Override public LeaderElectionService getResourceManagerLeaderElectionService() throws Exception { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java index 3162f40..2ac43be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.highavailability; import org.apache.flink.api.common.JobID; +import org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -30,6 +31,8 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices private volatile LeaderRetrievalService resourceManagerLeaderRetriever; + private ConcurrentHashMap<JobID, LeaderRetrievalService> jobMasterLeaderRetrievers = new ConcurrentHashMap<>(); + private volatile LeaderElectionService jobMasterLeaderElectionService; private volatile LeaderElectionService resourceManagerLeaderElectionService; @@ -43,6 +46,10 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices this.resourceManagerLeaderRetriever = resourceManagerLeaderRetriever; } + public void setJobMasterLeaderRetriever(JobID jobID, LeaderRetrievalService jobMasterLeaderRetriever) { + this.jobMasterLeaderRetrievers.put(jobID, jobMasterLeaderRetriever); + } + public void setJobMasterLeaderElectionService(LeaderElectionService leaderElectionService) { this.jobMasterLeaderElectionService = leaderElectionService; } @@ -66,6 +73,16 @@ public class TestingHighAvailabilityServices implements HighAvailabilityServices } @Override + public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws Exception { + LeaderRetrievalService service = this.jobMasterLeaderRetrievers.get(jobID); + if (service != null) { + return service; + } else { + throw new IllegalStateException("JobMasterLeaderRetriever has not been set"); + } + } + + @Override public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { LeaderElectionService service = jobMasterLeaderElectionService; http://git-wip-us.apache.org/repos/asf/flink/blob/efc7de5b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java new file mode 100644 index 0000000..4d04001 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +public class ResourceManagerTest { + + private TestingSerialRpcService rpcService; + + @Before + public void setup() throws Exception { + rpcService = new TestingSerialRpcService(); + } + + @After + public void teardown() throws Exception { + rpcService.stopService(); + } + + /** + * Test receive normal registration from job master and receive duplicate registration from job master + * + * @throws Exception + */ + @Test + public void testRegisterJobMaster() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test response successful + Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID); + RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS)); + assertTrue(response instanceof JobMasterRegistrationSuccess); + } + + /** + * Test receive registration with unmatched leadershipId from job master + * + * @throws Exception + */ + @Test(expected = LeaderSessionIDException.class) + public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test throw exception when receive a registration from job master which takes unmatched leaderSessionId + UUID differentLeaderSessionID = UUID.randomUUID(); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, jobID); + Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS)); + } + + /** + * Test receive registration with invalid address from job master + * + * @throws Exception + */ + @Test(expected = Exception.class) + public void testRegisterJobMasterFromInvalidAddress() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test throw exception when receive a registration from job master which takes invalid address + String invalidAddress = "/jobMasterAddress2"; + Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID); + Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS)); + } + + /** + * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener + * + * @throws Exception + */ + @Test + public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + JobID unknownJobIDToHAServices = new JobID(); + // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener + Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, unknownJobIDToHAServices); + RegistrationResponse response = Await.result(declineFuture, new FiniteDuration(0, TimeUnit.SECONDS)); + assertTrue(response instanceof RegistrationResponse.Decline); + } + + private JobID mockJobMaster(String jobMasterAddress) { + JobID jobID = new JobID(); + JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + rpcService.registerGateway(jobMasterAddress, jobMasterGateway); + return jobID; + } + + private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) { + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); + highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); + ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + resourceManager.start(); + return resourceManager; + } + + private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) { + UUID leaderSessionId = UUID.randomUUID(); + resourceManagerLeaderElectionService.isLeader(leaderSessionId); + return leaderSessionId; + } + +}