[FLINK-4535] rebase and refine
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/485ef003 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/485ef003 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/485ef003 Branch: refs/heads/flip-6 Commit: 485ef0035fe3f0d4335d880868ab9beb18731fdf Parents: c9764c8 Author: Maximilian Michels <m...@apache.org> Authored: Wed Sep 21 20:20:25 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:41 2016 +0200 ---------------------------------------------------------------------- .../resourcemanager/JobMasterRegistration.java | 64 ---- .../resourcemanager/ResourceManager.java | 322 ++++++++++++------- .../resourcemanager/ResourceManagerGateway.java | 36 +-- .../TaskExecutorRegistration.java | 2 +- .../slotmanager/SlotManager.java | 1 - .../ResourceManagerJobMasterTest.java | 174 ++++++++++ .../ResourceManagerTaskExecutorTest.java | 135 ++++++++ .../resourcemanager/ResourceManagerTest.java | 141 -------- .../slotmanager/SlotProtocolTest.java | 43 ++- 9 files changed, 574 insertions(+), 344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java deleted file mode 100644 index 981441f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; - -import java.util.UUID; - -/** - * This class is responsible for group the JobMasterGateway and the LeaderSessionID of a registered job master - */ -public class JobMasterRegistration implements LeaderRetrievalListener { - - private final JobMasterGateway gateway; - private final JobID jobID; - private final UUID leaderSessionID; - private LeaderRetrievalListener retriever; - - public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, UUID leaderSessionID) { - this.gateway = gateway; - this.jobID = jobID; - this.leaderSessionID = leaderSessionID; - } - - public JobMasterGateway getGateway() { - return gateway; - } - - public UUID getLeaderSessionID() { - return leaderSessionID; - } - - public JobID getJobID() { - return jobID; - } - - @Override - public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { - - } - - @Override - public void handleError(Exception exception) { - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 15692b6..88b8a11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -18,29 +18,41 @@ package org.apache.flink.runtime.resourcemanager; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; - import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.apache.flink.runtime.registration.RegistrationResponse; + import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; -import org.apache.flink.runtime.registration.RegistrationResponse; -import scala.concurrent.Future; +import org.apache.flink.runtime.util.LeaderConnectionInfo; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -50,25 +62,38 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * It offers the following methods as part of its rpc interface to interact with the him remotely: * <ul> - * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li> + * <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> * </ul> */ -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { - private final Map<JobMasterGateway, InstanceID> jobMasterGateways; +public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); - /** ResourceID and TaskExecutorRegistration mapping relationship of registered taskExecutors */ - private final Map<ResourceID, TaskExecutorRegistration> startedTaskExecutorGateways; + private final Map<JobID, JobMasterGateway> jobMasterGateways; + + private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners; + + private final Map<ResourceID, TaskExecutorRegistration> taskExecutorGateways; private final HighAvailabilityServices highAvailabilityServices; - private LeaderElectionService leaderElectionService = null; - private UUID leaderSessionID = null; - public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) { + private LeaderElectionService leaderElectionService; + + private final SlotManager slotManager; + + private UUID leaderSessionID; + + public ResourceManager( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + SlotManager slotManager) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); - this.jobMasterGateways = new HashMap<>(16); - this.startedTaskExecutorGateways = new HashMap<>(16); + this.jobMasterGateways = new HashMap<>(); + this.slotManager = slotManager; + this.jobMasterLeaderRetrievalListeners = new HashSet<>(); + this.taskExecutorGateways = new HashMap<>(); } @Override @@ -77,7 +102,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { try { super.start(); leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); - leaderElectionService.start(new ResourceManagerLeaderContender()); + leaderElectionService.start(this); } catch (Throwable e) { log.error("A fatal error happened when starting the ResourceManager", e); throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); @@ -88,8 +113,11 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { public void shutDown() { try { leaderElectionService.stop(); + for(JobID jobID : jobMasterGateways.keySet()) { + highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); + } super.shutDown(); - } catch(Throwable e) { + } catch (Throwable e) { log.error("A fatal error happened when shutdown the ResourceManager", e); throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e); } @@ -102,48 +130,79 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { */ @VisibleForTesting UUID getLeaderSessionID() { - return leaderSessionID; + return this.leaderSessionID; } /** * Register a {@link JobMaster} at the resource manager. * - * @param jobMasterRegistration Job master registration information + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param jobMasterAddress The address of the JobMaster that registers + * @param jobID The Job ID of the JobMaster that registers * @return Future registration response */ @RpcMethod - public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) { - Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); + public Future<RegistrationResponse> registerJobMaster( + final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, + final String jobMasterAddress, final JobID jobID) { - return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() { - @Override - public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { - InstanceID instanceID; + checkNotNull(jobMasterAddress); + checkNotNull(jobID); - if (jobMasterGateways.containsKey(jobMasterGateway)) { - instanceID = jobMasterGateways.get(jobMasterGateway); - } else { - instanceID = new InstanceID(); - jobMasterGateways.put(jobMasterGateway, instanceID); - } + return getRpcService() + .execute(new Callable<JobMasterGateway>() { + @Override + public JobMasterGateway call() throws Exception { - return new TaskExecutorRegistrationSuccess(instanceID, 5000); - } - }, getMainThreadExecutionContext()); - } + if (!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" + + " did not equal the received leader session ID {}", + jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId); + throw new Exception("Invalid leader session id"); + } - /** - * Requests a slot from the resource manager. - * - * @param slotRequest Slot request - * @return Slot assignment - */ - @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); - } + final LeaderConnectionInfo jobMasterLeaderInfo; + try { + jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( + highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); + } catch (Exception e) { + LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + throw new Exception("Failed to retrieve JobMasterLeaderRetriever"); + } + + if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) { + LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress); + throw new Exception("JobManager is not leading"); + } + return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS); + } + }) + .handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() { + @Override + public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable throwable) { + + if (throwable != null) { + return new RegistrationResponse.Decline(throwable.getMessage()); + } else { + JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); + try { + LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); + jobMasterLeaderRetriever.start(jobMasterLeaderListener); + } catch (Exception e) { + LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); + } + jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener); + final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); + if (existingGateway != null) { + log.info("Replacing gateway for registered JobID {}.", jobID); + } + return new JobMasterRegistrationSuccess(5000); + } + } + }, getMainThreadExecutor()); + } /** * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager @@ -160,90 +219,129 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { final String taskExecutorAddress, final ResourceID resourceID) { - if(!leaderSessionID.equals(resourceManagerLeaderId)) { - log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", - resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); - return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); + return getRpcService().execute(new Callable<TaskExecutorGateway>() { + @Override + public TaskExecutorGateway call() throws Exception { + if (!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " + + "not equal the received leader session ID {}", + resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); + throw new Exception("Invalid leader session id"); + } + + return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS); + } + }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { + @Override + public RegistrationResponse apply(TaskExecutorGateway taskExecutorGateway, Throwable throwable) { + if (throwable != null) { + return new RegistrationResponse.Decline(throwable.getMessage()); + } else { + InstanceID id = new InstanceID(); + TaskExecutorRegistration oldTaskExecutor = + taskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, id)); + if (oldTaskExecutor != null) { + log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); + } + return new TaskExecutorRegistrationSuccess(id, 5000); + } + } + }, getMainThreadExecutor()); + } + + /** + * Requests a slot from the resource manager. + * + * @param slotRequest Slot request + * @return Slot assignment + */ + @RpcMethod + public SlotRequestReply requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return new SlotRequestRejected(slotRequest.getAllocationId()); } + } - Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); - return taskExecutorGatewayFuture.map(new Mapper<TaskExecutorGateway, RegistrationResponse>() { + + // ------------------------------------------------------------------------ + // Leader Contender + // ------------------------------------------------------------------------ + + /** + * Callback method when current resourceManager is granted leadership + * + * @param leaderSessionID unique leadershipID + */ + @Override + public void grantLeadership(final UUID leaderSessionID) { + runAsync(new Runnable() { @Override - public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) { - InstanceID instanceID = null; - TaskExecutorRegistration taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID); - if(taskExecutorRegistration != null) { - log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); - instanceID = taskExecutorRegistration.getInstanceID(); - } else { - instanceID = new InstanceID(); - startedTaskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, instanceID)); - } + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + // notify SlotManager + slotManager.setLeaderUUID(leaderSessionID); + ResourceManager.this.leaderSessionID = leaderSessionID; + } + }); + } - return new TaskExecutorRegistrationSuccess(instanceID, 5000); + /** + * Callback method when current resourceManager lose leadership. + */ + @Override + public void revokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + taskExecutorGateways.clear(); + slotManager.clearState(); + leaderSessionID = null; } - }, getMainThreadExecutionContext()); + }); } + /** + * Handles error occurring in the leader election service + * + * @param exception Exception being thrown in the leader election service + */ + @Override + public void handleError(final Exception exception) { + log.error("ResourceManager received an error from the LeaderElectionService.", exception); + // terminate ResourceManager in case of an error + shutDown(); + } - private class ResourceManagerLeaderContender implements LeaderContender { + private static class JobMasterLeaderListener implements LeaderRetrievalListener { - /** - * Callback method when current resourceManager is granted leadership - * - * @param leaderSessionID unique leadershipID - */ - @Override - public void grantLeadership(final UUID leaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); - ResourceManager.this.leaderSessionID = leaderSessionID; - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(leaderSessionID); - } - }); - } + private final JobID jobID; + private UUID leaderID; - /** - * Callback method when current resourceManager lose leadership. - */ - @Override - public void revokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasterGateways.clear(); - startedTaskExecutorGateways.clear(); - leaderSessionID = null; - } - }); + private JobMasterLeaderListener(JobID jobID) { + this.jobID = jobID; } @Override - public String getAddress() { - return ResourceManager.this.getAddress(); + public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { + this.leaderID = leaderSessionID; } - /** - * Handles error occurring in the leader election service - * - * @param exception Exception being thrown in the leader election service - */ @Override public void handleError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("ResourceManager received an error from the LeaderElectionService.", exception); - // terminate ResourceManager in case of an error - shutDown(); - } - }); + // TODO } } } + http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 30a096f..d8b8ebe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -18,15 +18,16 @@ package org.apache.flink.runtime.resourcemanager; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; - import org.apache.flink.runtime.registration.RegistrationResponse; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import org.apache.flink.runtime.registration.RegistrationResponse; import java.util.UUID; /** @@ -37,21 +38,18 @@ public interface ResourceManagerGateway extends RpcGateway { /** * Register a {@link JobMaster} at the resource manager. * - * @param jobMasterRegistration Job master registration information - * @param timeout Timeout for the future to complete + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param jobMasterAddress The address of the JobMaster that registers + * @param jobID The Job ID of the JobMaster that registers + * @param timeout Timeout for the future to complete * @return Future registration response */ Future<RegistrationResponse> registerJobMaster( - JobMasterRegistration jobMasterRegistration, - @RpcTimeout FiniteDuration timeout); + UUID resourceManagerLeaderId, + String jobMasterAddress, + JobID jobID, + @RpcTimeout Time timeout); - /** - * Register a {@link JobMaster} at the resource manager. - * - * @param jobMasterRegistration Job master registration information - * @return Future registration response - */ - Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration); /** * Requests a slot from the resource manager. @@ -59,15 +57,15 @@ public interface ResourceManagerGateway extends RpcGateway { * @param slotRequest Slot request * @return Future slot assignment */ - Future<SlotAssignment> requestSlot(SlotRequest slotRequest); + Future<SlotRequestReply> requestSlot(SlotRequest slotRequest); /** * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager. * * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * @param timeout The timeout for the response. + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers + * @param timeout The timeout for the response. * * @return The future to the response by the ResourceManager. */ @@ -75,5 +73,5 @@ public interface ResourceManagerGateway extends RpcGateway { UUID resourceManagerLeaderId, String taskExecutorAddress, ResourceID resourceID, - @RpcTimeout FiniteDuration timeout); + @RpcTimeout Time timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java index bd78a47..f8dfdc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java @@ -35,7 +35,7 @@ public class TaskExecutorRegistration implements Serializable { private InstanceID instanceID; public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway, - InstanceID instanceID) { + InstanceID instanceID) { this.taskExecutorGateway = taskExecutorGateway; this.instanceID = instanceID; } http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 5d0013c..a6d2196 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java new file mode 100644 index 0000000..332c093 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +public class ResourceManagerJobMasterTest { + + private TestingSerialRpcService rpcService; + + @Before + public void setup() throws Exception { + rpcService = new TestingSerialRpcService(); + } + + @After + public void teardown() throws Exception { + rpcService.stopService(); + } + + /** + * Test receive normal registration from job master and receive duplicate registration from job master + */ + @Test + public void testRegisterJobMaster() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + UUID jmLeaderID = UUID.randomUUID(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test response successful + Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID); + RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + assertTrue(response instanceof JobMasterRegistrationSuccess); + } + + /** + * Test receive registration with unmatched leadershipId from job master + */ + @Test + public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + UUID jmLeaderID = UUID.randomUUID(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test throw exception when receive a registration from job master which takes unmatched leaderSessionId + UUID differentLeaderSessionID = UUID.randomUUID(); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID); + assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + } + + /** + * Test receive registration with unmatched leadershipId from job master + */ + @Test + public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test throw exception when receive a registration from job master which takes unmatched leaderSessionId + UUID differentLeaderSessionID = UUID.randomUUID(); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID); + assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + } + + /** + * Test receive registration with invalid address from job master + */ + @Test + public void testRegisterJobMasterFromInvalidAddress() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + // test throw exception when receive a registration from job master which takes invalid address + String invalidAddress = "/jobMasterAddress2"; + Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID); + assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + } + + /** + * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener + */ + @Test + public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { + String jobMasterAddress = "/jobMasterAddress1"; + JobID jobID = mockJobMaster(jobMasterAddress); + TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); + final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); + final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + + JobID unknownJobIDToHAServices = new JobID(); + // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener + Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices); + RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS); + assertTrue(response instanceof RegistrationResponse.Decline); + } + + private JobID mockJobMaster(String jobMasterAddress) { + JobID jobID = new JobID(); + JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + rpcService.registerGateway(jobMasterAddress, jobMasterGateway); + return jobID; + } + + private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) { + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); + highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); + ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); + resourceManager.start(); + return resourceManager; + } + + private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) { + UUID leaderSessionId = UUID.randomUUID(); + resourceManagerLeaderElectionService.isLeader(leaderSessionId); + return leaderSessionId; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java new file mode 100644 index 0000000..ed7c7d7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager; +import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class ResourceManagerTaskExecutorTest { + + private TestingSerialRpcService rpcService; + + @Before + public void setup() throws Exception { + rpcService = new TestingSerialRpcService(); + } + + @After + public void teardown() throws Exception { + rpcService.stopService(); + } + + /** + * Test receive normal registration from task executor and receive duplicate registration from task executor + */ + @Test + public void testRegisterTaskExecutor() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService); + final UUID leaderSessionId = grantLeadership(rmLeaderElectionService); + + // test response successful + Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); + RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); + assertTrue(response instanceof TaskExecutorRegistrationSuccess); + + // test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor + Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); + RegistrationResponse duplicateResponse = duplicateFuture.get(); + assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess); + assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId()); + } + + /** + * Test receive registration with unmatched leadershipId from task executor + */ + @Test + public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService); + final UUID leaderSessionId = grantLeadership(rmLeaderElectionService); + + // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId + UUID differentLeaderSessionID = UUID.randomUUID(); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID); + assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + } + + /** + * Test receive registration with invalid address from task executor + */ + @Test + public void testRegisterTaskExecutorFromInvalidAddress() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); + final UUID leaderSessionId = grantLeadership(leaderElectionService); + + // test throw exception when receive a registration from taskExecutor which takes invalid address + String invalidAddress = "/taskExecutor2"; + Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID); + assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); + } + + private ResourceID mockTaskExecutor(String taskExecutorAddress) { + TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + ResourceID taskExecutorResourceID = ResourceID.generate(); + rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway); + return taskExecutorResourceID; + } + + private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) { + TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); + ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); + resourceManager.start(); + return resourceManager; + } + + private UUID grantLeadership(TestingLeaderElectionService leaderElectionService) { + UUID leaderSessionId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderSessionId); + return leaderSessionId; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java deleted file mode 100644 index b75d9b8..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; -import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; -import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; -import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -public class ResourceManagerTest { - - private TestingSerialRpcService rpcService; - - @Before - public void setup() throws Exception { - rpcService = new TestingSerialRpcService(); - } - - @After - public void teardown() throws Exception { - rpcService.stopService(); - } - - /** - * Test receive normal registration from task executor and receive duplicate registration from task executor - * - * @throws Exception - */ - @Test - public void testRegisterTaskExecutor() throws Exception { - String taskExecutorAddress = "/taskExecutor1"; - ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); - TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); - final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); - final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService); - - // test response successful - Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); - RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS)); - assertTrue(response instanceof TaskExecutorRegistrationSuccess); - - // test response successful with previous instanceID when receive duplicate registration from taskExecutor - Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); - RegistrationResponse duplicateResponse = Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS)); - assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess); - assertEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId()); - } - - /** - * Test receive registration with unmatched leadershipId from task executor - * - * @throws Exception - */ - @Test(expected = LeaderSessionIDException.class) - public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { - String taskExecutorAddress = "/taskExecutor1"; - ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); - TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); - final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); - final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService); - - // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId - UUID differentLeaderSessionID = UUID.randomUUID(); - Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID); - Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS)); - } - - /** - * Test receive registration with invalid address from task executor - * - * @throws Exception - */ - @Test(expected = Exception.class) - public void testRegisterTaskExecutorFromInvalidAddress() throws Exception { - String taskExecutorAddress = "/taskExecutor1"; - ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); - TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); - final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); - final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService); - - // test throw exception when receive a registration from taskExecutor which takes invalid address - String invalidAddress = "/taskExecutor2"; - Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID); - Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS)); - } - - private ResourceID mockTaskExecutor(String taskExecutorAddress) { - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - ResourceID taskExecutorResourceID = ResourceID.generate(); - rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway); - return taskExecutorResourceID; - } - - private ResourceManager createAndStartResourceManager(TestingLeaderElectionService leaderElectionService) { - TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); - ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); - resourceManager.start(); - return resourceManager; - } - - private UUID grantResourceManagerLeadership(TestingLeaderElectionService leaderElectionService) { - UUID leaderSessionId = UUID.randomUUID(); - leaderElectionService.isLeader(leaderSessionId); - return leaderSessionId; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/485ef003/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 1f9e7e8..0232fab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -24,10 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; -import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; +import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; @@ -88,14 +92,20 @@ public class SlotProtocolTest extends TestLogger { testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices(); + final UUID rmLeaderID = UUID.randomUUID(); + final UUID jmLeaderID = UUID.randomUUID(); + TestingLeaderElectionService rmLeaderElectionService = + configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); ResourceManager resourceManager = - new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + new ResourceManager(testRpcService, testingHaServices, slotManager); resourceManager.start(); + rmLeaderElectionService.isLeader(rmLeaderID); Future<RegistrationResponse> registrationFuture = - resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID); try { registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { @@ -158,16 +168,23 @@ public class SlotProtocolTest extends TestLogger { testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices(); + final UUID rmLeaderID = UUID.randomUUID(); + final UUID jmLeaderID = UUID.randomUUID(); + TestingLeaderElectionService rmLeaderElectionService = + configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); + TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); testRpcService.registerGateway(tmAddress, taskExecutorGateway); TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); ResourceManager resourceManager = - new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + new ResourceManager(testRpcService, testingHaServices, slotManager); resourceManager.start(); + rmLeaderElectionService.isLeader(rmLeaderID); Future<RegistrationResponse> registrationFuture = - resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID); try { registrationFuture.get(5, TimeUnit.SECONDS); } catch (Exception e) { @@ -208,6 +225,20 @@ public class SlotProtocolTest extends TestLogger { verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); } + private static TestingLeaderElectionService configureHA( + TestingHighAvailabilityServices testingHA, JobID jobID, String rmAddress, UUID rmID, String jmAddress, UUID jmID) { + final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService); + final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(rmAddress, rmID); + testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); + + final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService(); + testingHA.setJobMasterLeaderElectionService(jmLeaderElectionService); + final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID); + testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService); + + return rmLeaderElectionService; + } private static class TestingSlotManager extends SimpleSlotManager {