[FLINK-4535] [cluster management] resourceManager process the registration from TaskExecutor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9764c8f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9764c8f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9764c8f Branch: refs/heads/flip-6 Commit: c9764c8f62e08a1fbf929ee2b718b52b8ae4fff2 Parents: 34a6854 Author: beyond1920 <beyond1...@126.com> Authored: Thu Sep 1 11:14:00 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:41 2016 +0200 ---------------------------------------------------------------------- .../resourcemanager/RegistrationResponse.java | 36 --- .../resourcemanager/ResourceManager.java | 288 ++++++++----------- .../resourcemanager/ResourceManagerGateway.java | 45 +-- .../TaskExecutorRegistration.java | 51 ++++ .../exceptions/LeaderSessionIDException.java | 1 + .../resourcemanager/ResourceManagerTest.java | 119 ++++---- 6 files changed, 241 insertions(+), 299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c9764c8f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java deleted file mode 100644 index 796e634..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import java.io.Serializable; - -public class RegistrationResponse implements Serializable { - private static final long serialVersionUID = -2379003255993119993L; - - private final boolean isSuccess; - - public RegistrationResponse(boolean isSuccess) { - this.isSuccess = isSuccess; - } - - public boolean isSuccess() { - return isSuccess; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9764c8f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index aae4874..15692b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -18,41 +18,29 @@ package org.apache.flink.runtime.resourcemanager; +import akka.dispatch.Futures; +import akka.dispatch.Mapper; + import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.ApplyFunction; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; -import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.registration.RegistrationResponse; - -import org.apache.flink.runtime.concurrent.Future; - -import org.apache.flink.runtime.util.LeaderConnectionInfo; -import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; +import scala.concurrent.Future; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; -import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -62,35 +50,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * It offers the following methods as part of its rpc interface to interact with the him remotely: * <ul> - * <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> + * <li>{@link #registerJobMaster(JobMasterRegistration)} registers a {@link JobMaster} at the resource manager</li> * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> * </ul> */ -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender { - - private final Logger LOG = LoggerFactory.getLogger(getClass()); +public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { + private final Map<JobMasterGateway, InstanceID> jobMasterGateways; - private final Map<JobID, JobMasterGateway> jobMasterGateways; - - private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners; + /** ResourceID and TaskExecutorRegistration mapping relationship of registered taskExecutors */ + private final Map<ResourceID, TaskExecutorRegistration> startedTaskExecutorGateways; private final HighAvailabilityServices highAvailabilityServices; + private LeaderElectionService leaderElectionService = null; + private UUID leaderSessionID = null; - private LeaderElectionService leaderElectionService; - - private final SlotManager slotManager; - - private UUID leaderSessionID; - - public ResourceManager( - RpcService rpcService, - HighAvailabilityServices highAvailabilityServices, - SlotManager slotManager) { + public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); - this.jobMasterGateways = new HashMap<>(); - this.slotManager = slotManager; - this.jobMasterLeaderRetrievalListeners = new HashSet<>(); + this.jobMasterGateways = new HashMap<>(16); + this.startedTaskExecutorGateways = new HashMap<>(16); } @Override @@ -99,7 +77,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme try { super.start(); leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); - leaderElectionService.start(this); + leaderElectionService.start(new ResourceManagerLeaderContender()); } catch (Throwable e) { log.error("A fatal error happened when starting the ResourceManager", e); throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); @@ -110,11 +88,8 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme public void shutDown() { try { leaderElectionService.stop(); - for(JobID jobID : jobMasterGateways.keySet()) { - highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); - } super.shutDown(); - } catch (Throwable e) { + } catch(Throwable e) { log.error("A fatal error happened when shutdown the ResourceManager", e); throw new RuntimeException("A fatal error happened when shutdown the ResourceManager", e); } @@ -127,78 +102,34 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme */ @VisibleForTesting UUID getLeaderSessionID() { - return this.leaderSessionID; + return leaderSessionID; } /** * Register a {@link JobMaster} at the resource manager. * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param jobMasterAddress The address of the JobMaster that registers - * @param jobID The Job ID of the JobMaster that registers + * @param jobMasterRegistration Job master registration information * @return Future registration response */ @RpcMethod - public Future<RegistrationResponse> registerJobMaster( - final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId, - final String jobMasterAddress, final JobID jobID) { - - checkNotNull(resourceManagerLeaderId); - checkNotNull(jobMasterAddress); - checkNotNull(jobID); + public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) { + Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); - // TODO mxm The leader retrieval needs to be split up in an async part which runs outside the main execution thread - // The state updates should be performed inside the main thread - - final FlinkCompletableFuture<RegistrationResponse> future = new FlinkCompletableFuture<>(); - - if(!leaderSessionID.equals(resourceManagerLeaderId)) { - log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" + - " did not equal the received leader session ID {}", - jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId); - future.complete(new RegistrationResponse.Decline("Invalid leader session id")); - return future; - } - - final LeaderConnectionInfo jobMasterLeaderInfo; - try { - jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo( - highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS)); - } catch (Exception e) { - LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); - future.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever")); - return future; - } - - if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) { - LOG.info("Declining registration request from non-leading JobManager {}", jobMasterAddress); - future.complete(new RegistrationResponse.Decline("JobManager is not leading")); - return future; - } - - Future<JobMasterGateway> jobMasterGatewayFuture = - getRpcService().connect(jobMasterAddress, JobMasterGateway.class); - - return jobMasterGatewayFuture.thenApplyAsync(new ApplyFunction<JobMasterGateway, RegistrationResponse>() { + return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() { @Override - public RegistrationResponse apply(JobMasterGateway jobMasterGateway) { - - final JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); - try { - LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); - jobMasterLeaderRetriever.start(jobMasterLeaderListener); - } catch (Exception e) { - LOG.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); - return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); - } - jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener); - final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); - if (existingGateway != null) { - log.info("Replacing gateway for registered JobID {}.", jobID); + public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { + InstanceID instanceID; + + if (jobMasterGateways.containsKey(jobMasterGateway)) { + instanceID = jobMasterGateways.get(jobMasterGateway); + } else { + instanceID = new InstanceID(); + jobMasterGateways.put(jobMasterGateway, instanceID); } - return new JobMasterRegistrationSuccess(5000); + + return new TaskExecutorRegistrationSuccess(instanceID, 5000); } - }, getMainThreadExecutor()); + }, getMainThreadExecutionContext()); } /** @@ -208,104 +139,111 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> impleme * @return Slot assignment */ @RpcMethod - public SlotRequestReply requestSlot(SlotRequest slotRequest) { - final JobID jobId = slotRequest.getJobId(); - final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); - - if (jobMasterGateway != null) { - return slotManager.requestSlot(slotRequest); - } else { - LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); - return new SlotRequestRejected(slotRequest.getAllocationId()); - } + public SlotAssignment requestSlot(SlotRequest slotRequest) { + System.out.println("SlotRequest: " + slotRequest); + return new SlotAssignment(); } /** - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers + * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager + * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers + * * @return The response by the ResourceManager. */ @RpcMethod - public RegistrationResponse registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID) { + public Future<RegistrationResponse> registerTaskExecutor( + final UUID resourceManagerLeaderId, + final String taskExecutorAddress, + final ResourceID resourceID) { - return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); - } + if(!leaderSessionID.equals(resourceManagerLeaderId)) { + log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did not equal the received leader session ID {}", + resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); + return Futures.failed(new LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId)); + } + Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); - // ------------------------------------------------------------------------ - // Leader Contender - // ------------------------------------------------------------------------ + return taskExecutorGatewayFuture.map(new Mapper<TaskExecutorGateway, RegistrationResponse>() { - /** - * Callback method when current resourceManager is granted leadership - * - * @param leaderSessionID unique leadershipID - */ - @Override - public void grantLeadership(final UUID leaderSessionID) { - runAsync(new Runnable() { @Override - public void run() { - log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(leaderSessionID); - // notify SlotManager - slotManager.setLeaderUUID(leaderSessionID); - ResourceManager.this.leaderSessionID = leaderSessionID; - } - }); - } + public RegistrationResponse apply(final TaskExecutorGateway taskExecutorGateway) { + InstanceID instanceID = null; + TaskExecutorRegistration taskExecutorRegistration = startedTaskExecutorGateways.get(resourceID); + if(taskExecutorRegistration != null) { + log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); + instanceID = taskExecutorRegistration.getInstanceID(); + } else { + instanceID = new InstanceID(); + startedTaskExecutorGateways.put(resourceID, new TaskExecutorRegistration(taskExecutorGateway, instanceID)); + } - /** - * Callback method when current resourceManager lose leadership. - */ - @Override - public void revokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasterGateways.clear(); - slotManager.clearState(); - leaderSessionID = null; + return new TaskExecutorRegistrationSuccess(instanceID, 5000); } - }); + }, getMainThreadExecutionContext()); } - /** - * Handles error occurring in the leader election service - * - * @param exception Exception being thrown in the leader election service - */ - @Override - public void handleError(final Exception exception) { - log.error("ResourceManager received an error from the LeaderElectionService.", exception); - // terminate ResourceManager in case of an error - shutDown(); - } - private static class JobMasterLeaderListener implements LeaderRetrievalListener { + private class ResourceManagerLeaderContender implements LeaderContender { - private final JobID jobID; - private UUID leaderID; + /** + * Callback method when current resourceManager is granted leadership + * + * @param leaderSessionID unique leadershipID + */ + @Override + public void grantLeadership(final UUID leaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); + ResourceManager.this.leaderSessionID = leaderSessionID; + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + } + }); + } - private JobMasterLeaderListener(JobID jobID) { - this.jobID = jobID; + /** + * Callback method when current resourceManager lose leadership. + */ + @Override + public void revokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + startedTaskExecutorGateways.clear(); + leaderSessionID = null; + } + }); } @Override - public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - this.leaderID = leaderSessionID; + public String getAddress() { + return ResourceManager.this.getAddress(); } + /** + * Handles error occurring in the leader election service + * + * @param exception Exception being thrown in the leader election service + */ @Override public void handleError(final Exception exception) { - // TODO + runAsync(new Runnable() { + @Override + public void run() { + log.error("ResourceManager received an error from the LeaderElectionService.", exception); + // terminate ResourceManager in case of an error + shutDown(); + } + }); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/c9764c8f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 1ee11a1..30a096f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; + import org.apache.flink.runtime.registration.RegistrationResponse; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import java.util.UUID; @@ -36,18 +37,21 @@ public interface ResourceManagerGateway extends RpcGateway { /** * Register a {@link JobMaster} at the resource manager. * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param jobMasterAddress The address of the JobMaster that registers - * @param jobID The Job ID of the JobMaster that registers - * @param timeout Timeout for the future to complete + * @param jobMasterRegistration Job master registration information + * @param timeout Timeout for the future to complete * @return Future registration response */ Future<RegistrationResponse> registerJobMaster( - UUID resourceManagerLeaderId, - String jobMasterAddress, - JobID jobID, - @RpcTimeout Time timeout); + JobMasterRegistration jobMasterRegistration, + @RpcTimeout FiniteDuration timeout); + /** + * Register a {@link JobMaster} at the resource manager. + * + * @param jobMasterRegistration Job master registration information + * @return Future registration response + */ + Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration); /** * Requests a slot from the resource manager. @@ -55,18 +59,21 @@ public interface ResourceManagerGateway extends RpcGateway { * @param slotRequest Slot request * @return Future slot assignment */ - Future<SlotRequestReply> requestSlot(SlotRequest slotRequest); + Future<SlotAssignment> requestSlot(SlotRequest slotRequest); /** - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * @param timeout The timeout for the response. + * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager. + * + * @param resourceManagerLeaderId The fencing token for the ResourceManager leader + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceID The resource ID of the TaskExecutor that registers + * @param timeout The timeout for the response. + * * @return The future to the response by the ResourceManager. */ Future<RegistrationResponse> registerTaskExecutor( - UUID resourceManagerLeaderId, - String taskExecutorAddress, - ResourceID resourceID, - @RpcTimeout Time timeout); + UUID resourceManagerLeaderId, + String taskExecutorAddress, + ResourceID resourceID, + @RpcTimeout FiniteDuration timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/c9764c8f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java new file mode 100644 index 0000000..bd78a47 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import java.io.Serializable; + +/** + * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor. + */ +public class TaskExecutorRegistration implements Serializable { + + private static final long serialVersionUID = -2062957799469434614L; + + private TaskExecutorGateway taskExecutorGateway; + + private InstanceID instanceID; + + public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway, + InstanceID instanceID) { + this.taskExecutorGateway = taskExecutorGateway; + this.instanceID = instanceID; + } + + public InstanceID getInstanceID() { + return instanceID; + } + + public TaskExecutorGateway getTaskExecutorGateway() { + return taskExecutorGateway; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9764c8f/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java index cd14a0d..d3ba9a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rpc.exceptions; import java.util.UUID; + import static org.apache.flink.util.Preconditions.checkNotNull; /** http://git-wip-us.apache.org/repos/asf/flink/blob/c9764c8f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 4d04001..b75d9b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -18,15 +18,14 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -37,8 +36,9 @@ import scala.concurrent.duration.FiniteDuration; import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; public class ResourceManagerTest { @@ -55,105 +55,86 @@ public class ResourceManagerTest { } /** - * Test receive normal registration from job master and receive duplicate registration from job master + * Test receive normal registration from task executor and receive duplicate registration from task executor * * @throws Exception */ @Test - public void testRegisterJobMaster() throws Exception { - String jobMasterAddress = "/jobMasterAddress1"; - JobID jobID = mockJobMaster(jobMasterAddress); - TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); - final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); - final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); + public void testRegisterTaskExecutor() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); + final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService); // test response successful - Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID); + Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); RegistrationResponse response = Await.result(successfulFuture, new FiniteDuration(0, TimeUnit.SECONDS)); - assertTrue(response instanceof JobMasterRegistrationSuccess); + assertTrue(response instanceof TaskExecutorRegistrationSuccess); + + // test response successful with previous instanceID when receive duplicate registration from taskExecutor + Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); + RegistrationResponse duplicateResponse = Await.result(duplicateFuture, new FiniteDuration(0, TimeUnit.SECONDS)); + assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess); + assertEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId()); } /** - * Test receive registration with unmatched leadershipId from job master + * Test receive registration with unmatched leadershipId from task executor * * @throws Exception */ @Test(expected = LeaderSessionIDException.class) - public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws Exception { - String jobMasterAddress = "/jobMasterAddress1"; - JobID jobID = mockJobMaster(jobMasterAddress); - TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); - final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); - final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); - - // test throw exception when receive a registration from job master which takes unmatched leaderSessionId + public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); + final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService); + + // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); - Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, jobID); + Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID); Await.result(unMatchedLeaderFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS)); } /** - * Test receive registration with invalid address from job master + * Test receive registration with invalid address from task executor * * @throws Exception */ @Test(expected = Exception.class) - public void testRegisterJobMasterFromInvalidAddress() throws Exception { - String jobMasterAddress = "/jobMasterAddress1"; - JobID jobID = mockJobMaster(jobMasterAddress); - TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); - final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); - final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); - - // test throw exception when receive a registration from job master which takes invalid address - String invalidAddress = "/jobMasterAddress2"; - Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID); + public void testRegisterTaskExecutorFromInvalidAddress() throws Exception { + String taskExecutorAddress = "/taskExecutor1"; + ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); + final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); + final UUID leaderSessionId = grantResourceManagerLeadership(leaderElectionService); + + // test throw exception when receive a registration from taskExecutor which takes invalid address + String invalidAddress = "/taskExecutor2"; + Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID); Await.result(invalidAddressFuture, new FiniteDuration(200, TimeUnit.MILLISECONDS)); } - /** - * Check and verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener - * - * @throws Exception - */ - @Test - public void testRegisterJobMasterWithFailureLeaderListener() throws Exception { - String jobMasterAddress = "/jobMasterAddress1"; - JobID jobID = mockJobMaster(jobMasterAddress); - TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService(); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(); - final ResourceManager resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService); - final UUID leaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService); - - JobID unknownJobIDToHAServices = new JobID(); - // verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener - Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, unknownJobIDToHAServices); - RegistrationResponse response = Await.result(declineFuture, new FiniteDuration(0, TimeUnit.SECONDS)); - assertTrue(response instanceof RegistrationResponse.Decline); - } - - private JobID mockJobMaster(String jobMasterAddress) { - JobID jobID = new JobID(); - JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); - rpcService.registerGateway(jobMasterAddress, jobMasterGateway); - return jobID; + private ResourceID mockTaskExecutor(String taskExecutorAddress) { + TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + ResourceID taskExecutorResourceID = ResourceID.generate(); + rpcService.registerGateway(taskExecutorAddress, taskExecutorGateway); + return taskExecutorResourceID; } - private ResourceManager createAndStartResourceManager(TestingLeaderElectionService resourceManagerLeaderElectionService, JobID jobID, TestingLeaderRetrievalService jobMasterLeaderRetrievalService) { + private ResourceManager createAndStartResourceManager(TestingLeaderElectionService leaderElectionService) { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); - highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); + highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); resourceManager.start(); return resourceManager; } - private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) { + private UUID grantResourceManagerLeadership(TestingLeaderElectionService leaderElectionService) { UUID leaderSessionId = UUID.randomUUID(); - resourceManagerLeaderElectionService.isLeader(leaderSessionId); + leaderElectionService.isLeader(leaderSessionId); return leaderSessionId; }