[FLINK-4606] integrate features of old ResourceManager This closes #2540
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3426fc0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3426fc0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3426fc0 Branch: refs/heads/flip-6 Commit: a3426fc033aac8b1d5922be7df184d18ed6d43c2 Parents: 1773988 Author: Maximilian Michels <[email protected]> Authored: Tue Sep 27 10:38:02 2016 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 20 19:46:26 2016 +0200 ---------------------------------------------------------------------- .../InfoMessageListenerRpcGateway.java | 1 - .../resourcemanager/ResourceManager.java | 146 ++++++++++++------- .../resourcemanager/ResourceManagerGateway.java | 6 +- .../ResourceManagerServices.java | 44 ++++++ .../StandaloneResourceManager.java | 19 ++- .../TaskExecutorRegistration.java | 51 ------- .../registration/TaskExecutorRegistration.java | 51 +++++++ .../slotmanager/SimpleSlotManager.java | 6 - .../slotmanager/SlotManager.java | 63 ++++++-- .../slotmanager/SlotManagerTest.java | 25 +++- .../slotmanager/SlotProtocolTest.java | 42 +++--- 11 files changed, 295 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java index c1eeefa..d1373ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; -import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.RpcGateway; /** http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 83dc4db..190a4de 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -20,14 +20,18 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.AcceptFunction; import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; @@ -48,11 +52,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils; import scala.concurrent.duration.FiniteDuration; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,36 +67,43 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * It offers the following methods as part of its rpc interface to interact with the him remotely: * <ul> * <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li> - * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> + * <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li> * </ul> */ -public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends TaskExecutorRegistration> extends RpcEndpoint implements LeaderContender { +public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> + extends RpcEndpoint<ResourceManagerGateway> + implements LeaderContender { /** The exit code with which the process is stopped in case of a fatal error */ protected static final int EXIT_CODE_FATAL_ERROR = -13; private final Map<JobID, JobMasterGateway> jobMasterGateways; - private final Set<LeaderRetrievalListener> jobMasterLeaderRetrievalListeners; + private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners; private final Map<ResourceID, WorkerType> taskExecutorGateways; private final HighAvailabilityServices highAvailabilityServices; - private LeaderElectionService leaderElectionService; - private final SlotManager slotManager; + private LeaderElectionService leaderElectionService; + private UUID leaderSessionID; private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners; - public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, SlotManager slotManager) { + private final Time timeout = Time.seconds(5); + + public ResourceManager( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + SlotManager slotManager) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.jobMasterGateways = new HashMap<>(); this.slotManager = checkNotNull(slotManager); - this.jobMasterLeaderRetrievalListeners = new HashSet<>(); + this.jobMasterLeaderRetrievalListeners = new HashMap<>(); this.taskExecutorGateways = new HashMap<>(); infoMessageListeners = new HashMap<>(); } @@ -105,6 +115,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends super.start(); leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); leaderElectionService.start(this); + slotManager.setupResourceManagerServices(new DefaultResourceManagerServices()); // framework specific initialization initialize(); } catch (Throwable e) { @@ -117,7 +128,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends public void shutDown() { try { leaderElectionService.stop(); - for(JobID jobID : jobMasterGateways.keySet()) { + for (JobID jobID : jobMasterGateways.keySet()) { highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop(); } super.shutDown(); @@ -189,15 +200,17 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); } else { - JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); - try { - LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); - jobMasterLeaderRetriever.start(jobMasterLeaderListener); - } catch (Exception e) { - log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); - return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); + if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) { + JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID); + try { + LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobMasterLeaderRetriever(jobID); + jobMasterLeaderRetriever.start(jobMasterLeaderListener); + } catch (Exception e) { + log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID); + return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"); + } + jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener); } - jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener); final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); if (existingGateway != null) { log.info("Replacing gateway for registered JobID {}.", jobID); @@ -232,7 +245,6 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId); throw new Exception("Invalid leader session id"); } - return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS); } }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() { @@ -241,24 +253,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends if (throwable != null) { return new RegistrationResponse.Decline(throwable.getMessage()); } else { - WorkerType startedWorker = taskExecutorGateways.get(resourceID); - if(startedWorker != null) { - String oldWorkerAddress = startedWorker.getTaskExecutorGateway().getAddress(); - if (taskExecutorAddress.equals(oldWorkerAddress)) { - log.warn("Receive a duplicate registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress); - } else { - log.warn("Receive a duplicate registration from TaskExecutor {} at different address, previous ({}), new ({})", - resourceID, oldWorkerAddress, taskExecutorAddress); - // TODO :: suggest old taskExecutor to stop itself - slotManager.notifyTaskManagerFailure(resourceID); - startedWorker = workerStarted(resourceID, taskExecutorGateway); - taskExecutorGateways.put(resourceID, startedWorker); - } - } else { - startedWorker = workerStarted(resourceID, taskExecutorGateway); - taskExecutorGateways.put(resourceID, startedWorker); + WorkerType oldWorker = taskExecutorGateways.remove(resourceID); + if (oldWorker != null) { + // TODO :: suggest old taskExecutor to stop itself + slotManager.notifyTaskManagerFailure(resourceID); } - return new TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000); + WorkerType newWorker = workerStarted(resourceID); + taskExecutorGateways.put(resourceID, newWorker); + return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } } }, getMainThreadExecutor()); @@ -271,11 +273,20 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends * @return Slot assignment */ @RpcMethod - public SlotRequestReply requestSlot(SlotRequest slotRequest) { - final JobID jobId = slotRequest.getJobId(); - final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + public SlotRequestReply requestSlot( + UUID jobMasterLeaderID, + UUID resourceManagerLeaderID, + SlotRequest slotRequest) { + + JobID jobId = slotRequest.getJobId(); + JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId); + + UUID leaderID = jobMasterLeaderListener.getLeaderID(); - if (jobMasterGateway != null) { + if (jobMasterGateway != null + && jobMasterLeaderID.equals(leaderID) + && resourceManagerLeaderID.equals(leaderSessionID)) { return slotManager.requestSlot(slotRequest); } else { log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); @@ -379,7 +390,7 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends } /** - * Shutdowns cluster + * Cleanup application and shut down cluster * * @param finalStatus * @param optionalDiagnostics @@ -446,17 +457,11 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends protected abstract void initialize() throws Exception; /** - * Callback when a task executor register. + * Notifies the resource master of a fatal error. * - * @param resourceID The worker resource id - * @param taskExecutorGateway the task executor gateway - */ - protected abstract WorkerType workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway); - - /** - * Callback when a resource manager faced a fatal error - * @param message - * @param error + * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, but exit it in + * such a way that a high-availability setting would restart this or fail over + * to another master. */ protected abstract void fatalError(String message, Throwable error); @@ -472,6 +477,19 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends */ protected abstract void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics); + /** + * Allocates a resource using the resource profile. + * @param resourceProfile The resource description + */ + @VisibleForTesting + public abstract void startNewWorker(ResourceProfile resourceProfile); + + /** + * Callback when a worker was started. + * @param resourceID The worker resource id + */ + protected abstract WorkerType workerStarted(ResourceID resourceID); + // ------------------------------------------------------------------------ // Info messaging // ------------------------------------------------------------------------ @@ -489,6 +507,24 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends }); } + private class DefaultResourceManagerServices implements ResourceManagerServices { + + @Override + public void allocateResource(ResourceProfile resourceProfile) { + ResourceManager.this.startNewWorker(resourceProfile); + } + + @Override + public Executor getAsyncExecutor() { + return ResourceManager.this.getRpcService().getExecutor(); + } + + @Override + public Executor getExecutor() { + return ResourceManager.this.getMainThreadExecutor(); + } + } + private static class JobMasterLeaderListener implements LeaderRetrievalListener { private final JobID jobID; @@ -498,6 +534,14 @@ public abstract class ResourceManager<ResourceManagerGateway, WorkerType extends this.jobID = jobID; } + public JobID getJobID() { + return jobID; + } + + public UUID getLeaderID() { + return leaderID; + } + @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { this.leaderID = leaderSessionID; http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 7c44006..87303a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -59,7 +59,11 @@ public interface ResourceManagerGateway extends RpcGateway { * @param slotRequest Slot request * @return Future slot assignment */ - Future<SlotRequestReply> requestSlot(SlotRequest slotRequest); + Future<SlotRequestReply> requestSlot( + UUID jobMasterLeaderID, + UUID resourceManagerLeaderID, + SlotRequest slotRequest, + @RpcTimeout Time timeout); /** * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager. http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java new file mode 100644 index 0000000..30994dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + +import java.util.concurrent.Executor; + +/** + * Interface which provides access to services of the ResourceManager. + */ +public interface ResourceManagerServices { + + /** + * Allocates a resource according to the resource profile. + */ + void allocateResource(ResourceProfile resourceProfile); + + /** + * Gets the async excutor which executes outside of the main thread of the ResourceManager + */ + Executor getAsyncExecutor(); + + /** + * Gets the executor which executes in the main thread of the ResourceManager + */ + Executor getExecutor(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java index 84db1ee..deca8d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java @@ -20,17 +20,18 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; /** * A standalone implementation of the resource manager. Used when the system is started in * standalone mode (via scripts), rather than via a resource framework like YARN or Mesos. + * + * This ResourceManager doesn't acquire new resources. */ -public class StandaloneResourceManager extends ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> { +public class StandaloneResourceManager extends ResourceManager<ResourceID> { public StandaloneResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices, @@ -51,14 +52,16 @@ public class StandaloneResourceManager extends ResourceManager<ResourceManagerGa } @Override - protected TaskExecutorRegistration workerStarted(ResourceID resourceID, TaskExecutorGateway taskExecutorGateway) { - InstanceID instanceID = new InstanceID(); - TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorGateway, instanceID); - return taskExecutorRegistration; + protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { } @Override - protected void shutDownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + public void startNewWorker(ResourceProfile resourceProfile) { + } + @Override + protected ResourceID workerStarted(ResourceID resourceID) { + return resourceID; } + } http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java deleted file mode 100644 index f8dfdc7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.runtime.instance.InstanceID; -import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; - -import java.io.Serializable; - -/** - * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor. - */ -public class TaskExecutorRegistration implements Serializable { - - private static final long serialVersionUID = -2062957799469434614L; - - private TaskExecutorGateway taskExecutorGateway; - - private InstanceID instanceID; - - public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway, - InstanceID instanceID) { - this.taskExecutorGateway = taskExecutorGateway; - this.instanceID = instanceID; - } - - public InstanceID getInstanceID() { - return instanceID; - } - - public TaskExecutorGateway getTaskExecutorGateway() { - return taskExecutorGateway; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java new file mode 100644 index 0000000..6b21f5c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.registration; + +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import java.io.Serializable; + +/** + * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor. + */ +public class TaskExecutorRegistration implements Serializable { + + private static final long serialVersionUID = -2062957799469434614L; + + private TaskExecutorGateway taskExecutorGateway; + + private InstanceID instanceID; + + public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway, + InstanceID instanceID) { + this.taskExecutorGateway = taskExecutorGateway; + this.instanceID = instanceID; + } + + public InstanceID getInstanceID() { + return instanceID; + } + + public TaskExecutorGateway getTaskExecutorGateway() { + return taskExecutorGateway; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java index ef5ce31..ae1de5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.resourcemanager.SlotRequest; @@ -51,9 +50,4 @@ public class SimpleSlotManager extends SlotManager { } } - @Override - protected void allocateContainer(ResourceProfile resourceProfile) { - // TODO - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index a6d2196..a56b2f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -22,16 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +84,9 @@ public abstract class SlotManager { /** The current leader id set by the ResourceManager */ private UUID leaderID; + /** The Resource allocation provider */ + private ResourceManagerServices resourceManagerServices; + public SlotManager() { this.registeredSlots = new HashMap<>(16); this.pendingSlotRequests = new LinkedHashMap<>(16); @@ -91,6 +96,16 @@ public abstract class SlotManager { this.timeout = Time.seconds(10); } + /** + * Initializes the resource supplier which is needed to request new resources. + */ + public void setupResourceManagerServices(ResourceManagerServices resourceManagerServices) { + if (this.resourceManagerServices != null) { + throw new IllegalStateException("ResourceManagerServices may only be set once."); + } + this.resourceManagerServices = resourceManagerServices; + } + // ------------------------------------------------------------------------ // slot managements @@ -120,17 +135,32 @@ public abstract class SlotManager { // record this allocation in bookkeeping allocationMap.addAllocation(slot.getSlotId(), allocationId); - // remove selected slot from free pool - freeSlots.remove(slot.getSlotId()); + final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId()); final Future<SlotRequestReply> slotRequestReplyFuture = slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout); - // TODO handle timeouts and response + + slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() { + @Override + public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) { + if (throwable != null) { + // we failed, put the slot and the request back again + if (allocationMap.isAllocated(slot.getSlotId())) { + // only re-add if the slot hasn't been removed in the meantime + freeSlots.put(slot.getSlotId(), removedSlot); + } + pendingSlotRequests.put(allocationId, request); + } + return null; + } + }, resourceManagerServices.getExecutor()); } else { LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); - allocateContainer(request.getResourceProfile()); + Preconditions.checkState(resourceManagerServices != null, + "Attempted to allocate resources but no ResourceManagerServices set."); + resourceManagerServices.allocateResource(request.getResourceProfile()); pendingSlotRequests.put(allocationId, request); } @@ -343,7 +373,7 @@ public abstract class SlotManager { if (chosenRequest != null) { final AllocationID allocationId = chosenRequest.getAllocationId(); - pendingSlotRequests.remove(allocationId); + final SlotRequest removedSlotRequest = pendingSlotRequests.remove(allocationId); LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(), allocationId, chosenRequest.getJobId()); @@ -351,7 +381,19 @@ public abstract class SlotManager { final Future<SlotRequestReply> slotRequestReplyFuture = freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout); - // TODO handle timeouts and response + + slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() { + @Override + public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) { + if (throwable != null) { + // we failed, add the request back again + if (allocationMap.isAllocated(freeSlot.getSlotId())) { + pendingSlotRequests.put(allocationId, removedSlotRequest); + } + } + return null; + } + }, resourceManagerServices.getExecutor()); } else { freeSlots.put(freeSlot.getSlotId(), freeSlot); } @@ -417,13 +459,6 @@ public abstract class SlotManager { protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot, final Map<AllocationID, SlotRequest> pendingRequests); - /** - * The framework specific code for allocating a container for specified resource profile. - * - * @param resourceProfile The resource profile - */ - protected abstract void allocateContainer(final ResourceProfile resourceProfile); - // ------------------------------------------------------------------------ // Helper classes // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 9ee9690..0fed79e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -19,12 +19,16 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.junit.BeforeClass; @@ -34,10 +38,13 @@ import org.mockito.Mockito; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; public class SlotManagerTest { @@ -57,6 +64,8 @@ public class SlotManagerTest { @BeforeClass public static void setUp() { taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class); + Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class))) + .thenReturn(new FlinkCompletableFuture<SlotRequestReply>()); } /** @@ -498,12 +507,13 @@ public class SlotManagerTest { // testing classes // ------------------------------------------------------------------------ - private static class TestingSlotManager extends SlotManager { + private static class TestingSlotManager extends SlotManager implements ResourceManagerServices { private final List<ResourceProfile> allocatedContainers; TestingSlotManager() { this.allocatedContainers = new LinkedList<>(); + setupResourceManagerServices(this); } /** @@ -543,12 +553,23 @@ public class SlotManagerTest { } @Override - protected void allocateContainer(ResourceProfile resourceProfile) { + public void allocateResource(ResourceProfile resourceProfile) { allocatedContainers.add(resourceProfile); } + @Override + public Executor getAsyncExecutor() { + return Mockito.mock(Executor.class); + } + + @Override + public Executor getExecutor() { + return Mockito.mock(Executor.class); + } + List<ResourceProfile> getAllocatedContainers() { return allocatedContainers; } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a3426fc0/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index ff25897..e3018c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -24,18 +24,14 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.SlotRequestReply; -import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; +import org.apache.flink.runtime.resourcemanager.*; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; @@ -47,9 +43,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; @@ -99,9 +98,9 @@ public class SlotProtocolTest extends TestLogger { TestingLeaderElectionService rmLeaderElectionService = configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); - TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + SlotManager slotManager = Mockito.spy(new SimpleSlotManager()); ResourceManager resourceManager = - new StandaloneResourceManager(testRpcService, testingHaServices, slotManager); + Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager)); resourceManager.start(); rmLeaderElectionService.isLeader(rmLeaderID); @@ -118,7 +117,7 @@ public class SlotProtocolTest extends TestLogger { SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); SlotRequestReply slotRequestReply = - resourceManager.requestSlot(slotRequest); + resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest); // 1) SlotRequest is routed to the SlotManager verify(slotManager).requestSlot(slotRequest); @@ -129,13 +128,15 @@ public class SlotProtocolTest extends TestLogger { allocationID); // 3) SlotRequest leads to a container allocation - verify(slotManager, timeout(5000)).allocateContainer(resourceProfile); + verify(resourceManager, timeout(5000)).startNewWorker(resourceProfile); Assert.assertFalse(slotManager.isAllocated(allocationID)); // slot becomes available final String tmAddress = "/tm1"; TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class))) + .thenReturn(new FlinkCompletableFuture<SlotRequestReply>()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); final ResourceID resourceID = ResourceID.generate(); @@ -176,11 +177,13 @@ public class SlotProtocolTest extends TestLogger { configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class))) + .thenReturn(new FlinkCompletableFuture<SlotRequestReply>()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); - TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + SlotManager slotManager = Mockito.spy(new SimpleSlotManager()); ResourceManager resourceManager = - new StandaloneResourceManager(testRpcService, testingHaServices, slotManager); + Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager)); resourceManager.start(); rmLeaderElectionService.isLeader(rmLeaderID); @@ -207,7 +210,7 @@ public class SlotProtocolTest extends TestLogger { SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); SlotRequestReply slotRequestReply = - resourceManager.requestSlot(slotRequest); + resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest); // 1) a SlotRequest is routed to the SlotManager verify(slotManager).requestSlot(slotRequest); @@ -241,15 +244,4 @@ public class SlotProtocolTest extends TestLogger { return rmLeaderElectionService; } - private static class TestingSlotManager extends SimpleSlotManager { - - // change visibility of function to public for testing - @Override - public void allocateContainer(ResourceProfile resourceProfile) { - super.allocateContainer(resourceProfile); - } - - - } - }
