[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol - associates JobMasters with JobID instead of InstanceID - adds TaskExecutorGateway to slot - adds SlotManager as RM constructor parameter - adds LeaderRetrievalListener to SlotManager to keep track of the leader id
- tests the interaction JM->RM requestSlot - tests the interaction RM->TM requestSlot This closes #2463 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a48fa50 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a48fa50 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a48fa50 Branch: refs/heads/flip-6 Commit: 8a48fa503d808cd588daee06f90d9e010ee47e89 Parents: 8e9db6b Author: Maximilian Michels <m...@apache.org> Authored: Thu Sep 1 16:53:31 2016 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Thu Oct 6 13:38:41 2016 +0200 ---------------------------------------------------------------------- .../clusterframework/types/ResourceProfile.java | 2 +- .../clusterframework/types/ResourceSlot.java | 14 +- .../resourcemanager/JobMasterRegistration.java | 10 +- .../resourcemanager/RegistrationResponse.java | 9 +- .../resourcemanager/ResourceManager.java | 167 +++--- .../resourcemanager/ResourceManagerGateway.java | 2 +- .../runtime/resourcemanager/SlotAssignment.java | 25 - .../runtime/resourcemanager/SlotManager.java | 523 ----------------- .../resourcemanager/SlotRequestRegistered.java | 33 ++ .../resourcemanager/SlotRequestRejected.java | 34 ++ .../resourcemanager/SlotRequestReply.java | 41 ++ .../slotmanager/SimpleSlotManager.java | 59 ++ .../slotmanager/SlotManager.java | 579 +++++++++++++++++++ .../flink/runtime/taskexecutor/SlotStatus.java | 5 +- .../taskexecutor/TaskExecutorGateway.java | 17 + .../resourcemanager/ResourceManagerHATest.java | 4 +- .../resourcemanager/SlotManagerTest.java | 538 ----------------- .../slotmanager/SlotManagerTest.java | 554 ++++++++++++++++++ .../slotmanager/SlotProtocolTest.java | 225 +++++++ .../flink/runtime/rpc/TestingRpcService.java | 6 +- .../runtime/rpc/TestingSerialRpcService.java | 4 + 21 files changed, 1677 insertions(+), 1174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index ff1c4bf..fa3aabc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -68,6 +68,6 @@ public class ResourceProfile implements Serializable { * @return true if the requirement is matched, otherwise false */ public boolean isMatching(ResourceProfile required) { - return Double.compare(cpuCores, required.getCpuCores()) >= 0 && memoryInMB >= required.getMemoryInMB(); + return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java index 8a6db5f..5fb8aee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.clusterframework.types; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + import java.io.Serializable; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -26,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * A ResourceSlot represents a slot located in TaskManager from ResourceManager's view. It has a unique * identification and resource profile which we can compare to the resource request. */ -public class ResourceSlot implements ResourceIDRetrievable, Serializable { +public class ResourceSlot implements ResourceIDRetrievable { private static final long serialVersionUID = -5853720153136840674L; @@ -36,9 +38,13 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable { /** The resource profile of this slot */ private final ResourceProfile resourceProfile; - public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) { + /** Gateway to the TaskExecutor which owns the slot */ + private final TaskExecutorGateway taskExecutorGateway; + + public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorGateway taskExecutorGateway) { this.slotId = checkNotNull(slotId); this.resourceProfile = checkNotNull(resourceProfile); + this.taskExecutorGateway = taskExecutorGateway; } @Override @@ -54,6 +60,10 @@ public class ResourceSlot implements ResourceIDRetrievable, Serializable { return resourceProfile; } + public TaskExecutorGateway getTaskExecutorGateway() { + return taskExecutorGateway; + } + /** * Check whether required resource profile can be matched by this slot. * http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java index 309dcc1..439e56b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java @@ -18,18 +18,26 @@ package org.apache.flink.runtime.resourcemanager; +import org.apache.flink.api.common.JobID; + import java.io.Serializable; public class JobMasterRegistration implements Serializable { private static final long serialVersionUID = 8411214999193765202L; private final String address; + private final JobID jobID; - public JobMasterRegistration(String address) { + public JobMasterRegistration(String address, JobID jobID) { this.address = address; + this.jobID = jobID; } public String getAddress() { return address; } + + public JobID getJobID() { + return jobID; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java index fb6c401..796e634 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java @@ -18,26 +18,19 @@ package org.apache.flink.runtime.resourcemanager; -import org.apache.flink.runtime.instance.InstanceID; - import java.io.Serializable; public class RegistrationResponse implements Serializable { private static final long serialVersionUID = -2379003255993119993L; private final boolean isSuccess; - private final InstanceID instanceID; - public RegistrationResponse(boolean isSuccess, InstanceID instanceID) { + public RegistrationResponse(boolean isSuccess) { this.isSuccess = isSuccess; - this.instanceID = instanceID; } public boolean isSuccess() { return isSuccess; } - public InstanceID getInstanceID() { - return instanceID; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 44c022b..29aba1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -21,11 +21,13 @@ package org.apache.flink.runtime.resourcemanager; import akka.dispatch.Mapper; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.leaderelection.LeaderContender; import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -33,6 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Future; import java.util.HashMap; @@ -51,16 +55,28 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource manager</li> * </ul> */ -public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { - private final Map<JobMasterGateway, InstanceID> jobMasterGateways; +public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> implements LeaderContender { + + private final Logger LOG = LoggerFactory.getLogger(getClass()); + + private final Map<JobID, JobMasterGateway> jobMasterGateways; + private final HighAvailabilityServices highAvailabilityServices; - private LeaderElectionService leaderElectionService = null; - private UUID leaderSessionID = null; - public ResourceManager(RpcService rpcService, HighAvailabilityServices highAvailabilityServices) { + private LeaderElectionService leaderElectionService; + + private final SlotManager slotManager; + + private UUID leaderSessionID; + + public ResourceManager( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + SlotManager slotManager) { super(rpcService); this.highAvailabilityServices = checkNotNull(highAvailabilityServices); this.jobMasterGateways = new HashMap<>(); + this.slotManager = slotManager; } @Override @@ -69,7 +85,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { try { super.start(); leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService(); - leaderElectionService.start(new ResourceManagerLeaderContender()); + leaderElectionService.start(this); } catch (Throwable e) { log.error("A fatal error happened when starting the ResourceManager", e); throw new RuntimeException("A fatal error happened when starting the ResourceManager", e); @@ -94,7 +110,7 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { */ @VisibleForTesting UUID getLeaderSessionID() { - return leaderSessionID; + return this.leaderSessionID; } /** @@ -105,21 +121,20 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { */ @RpcMethod public Future<RegistrationResponse> registerJobMaster(JobMasterRegistration jobMasterRegistration) { - Future<JobMasterGateway> jobMasterFuture = getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); + final Future<JobMasterGateway> jobMasterFuture = + getRpcService().connect(jobMasterRegistration.getAddress(), JobMasterGateway.class); + final JobID jobID = jobMasterRegistration.getJobID(); return jobMasterFuture.map(new Mapper<JobMasterGateway, RegistrationResponse>() { @Override public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { - InstanceID instanceID; - if (jobMasterGateways.containsKey(jobMasterGateway)) { - instanceID = jobMasterGateways.get(jobMasterGateway); - } else { - instanceID = new InstanceID(); - jobMasterGateways.put(jobMasterGateway, instanceID); + final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway); + if (existingGateway != null) { + LOG.info("Replacing existing gateway {} for JobID {} with {}.", + existingGateway, jobID, jobMasterGateway); } - - return new RegistrationResponse(true, instanceID); + return new RegistrationResponse(true); } }, getMainThreadExecutionContext()); } @@ -131,9 +146,16 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestReply requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return new SlotRequestRejected(slotRequest.getAllocationId()); + } } @@ -154,61 +176,62 @@ public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> { return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000); } - private class ResourceManagerLeaderContender implements LeaderContender { - - /** - * Callback method when current resourceManager is granted leadership - * - * @param leaderSessionID unique leadershipID - */ - @Override - public void grantLeadership(final UUID leaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); - ResourceManager.this.leaderSessionID = leaderSessionID; - // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(leaderSessionID); - } - }); - } - /** - * Callback method when current resourceManager lose leadership. - */ - @Override - public void revokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); - jobMasterGateways.clear(); - leaderSessionID = null; - } - }); - } + // ------------------------------------------------------------------------ + // Leader Contender + // ------------------------------------------------------------------------ - @Override - public String getAddress() { - return ResourceManager.this.getAddress(); - } + /** + * Callback method when current resourceManager is granted leadership + * + * @param leaderSessionID unique leadershipID + */ + @Override + public void grantLeadership(final UUID leaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID); + // confirming the leader session ID might be blocking, + leaderElectionService.confirmLeaderSessionID(leaderSessionID); + // notify SlotManager + slotManager.notifyLeaderAddress(getAddress(), leaderSessionID); + ResourceManager.this.leaderSessionID = leaderSessionID; + } + }); + } - /** - * Handles error occurring in the leader election service - * - * @param exception Exception being thrown in the leader election service - */ - @Override - public void handleError(final Exception exception) { - runAsync(new Runnable() { - @Override - public void run() { - log.error("ResourceManager received an error from the LeaderElectionService.", exception); - // terminate ResourceManager in case of an error - shutDown(); - } - }); - } + /** + * Callback method when current resourceManager lose leadership. + */ + @Override + public void revokeLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("ResourceManager {} was revoked leadership.", getAddress()); + jobMasterGateways.clear(); + ResourceManager.this.leaderSessionID = null; + } + }); + } + + /** + * Handles error occurring in the leader election service + * + * @param exception Exception being thrown in the leader election service + */ + @Override + public void handleError(final Exception exception) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("ResourceManager received an error from the LeaderElectionService.", exception); + // notify SlotManager + slotManager.handleError(exception); + // terminate ResourceManager in case of an error + shutDown(); + } + }); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index b5782b0..e5c8b64 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway { * @param slotRequest Slot request * @return Future slot assignment */ - Future<SlotAssignment> requestSlot(SlotRequest slotRequest); + Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest); /** * http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java deleted file mode 100644 index 695204d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import java.io.Serializable; - -public class SlotAssignment implements Serializable{ - private static final long serialVersionUID = -6990813455942742322L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java deleted file mode 100644 index 5c06648..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.taskexecutor.SlotReport; -import org.apache.flink.runtime.taskexecutor.SlotStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request - * slots from registered TaskManagers and issues container allocation requests in case of there are not - * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. - * <p> - * The main operation principle of SlotManager is: - * <ul> - * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> - * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> - * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should - * be handled outside SlotManager. SlotManager will make each decision based on the information it currently - * holds.</li> - * </ul> - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. - */ -public abstract class SlotManager { - - private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); - - /** Gateway to communicate with ResourceManager */ - private final ResourceManagerGateway resourceManagerGateway; - - /** All registered slots, including free and allocated slots */ - private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; - - /** All pending slot requests, waiting available slots to fulfil */ - private final Map<AllocationID, SlotRequest> pendingSlotRequests; - - /** All free slots that can be used to be allocated */ - private final Map<SlotID, ResourceSlot> freeSlots; - - /** All allocations, we can lookup allocations either by SlotID or AllocationID */ - private final AllocationMap allocationMap; - - public SlotManager(ResourceManagerGateway resourceManagerGateway) { - this.resourceManagerGateway = checkNotNull(resourceManagerGateway); - this.registeredSlots = new HashMap<>(16); - this.pendingSlotRequests = new LinkedHashMap<>(16); - this.freeSlots = new HashMap<>(16); - this.allocationMap = new AllocationMap(); - } - - // ------------------------------------------------------------------------ - // slot managements - // ------------------------------------------------------------------------ - - /** - * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container - * allocation if we don't have enough resource. If we have free slot which can match the request, record - * this allocation and forward the request to TaskManager through ResourceManager (we want this done by - * RPC's main thread to avoid race condition). - * - * @param request The detailed request of the slot - */ - public void requestSlot(final SlotRequest request) { - if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", request.getAllocationId()); - return; - } - - // try to fulfil the request with current free slots - ResourceSlot slot = chooseSlotToUse(request, freeSlots); - if (slot != null) { - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - request.getAllocationId(), request.getJobId()); - - // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), request.getAllocationId()); - - // remove selected slot from free pool - freeSlots.remove(slot.getSlotId()); - - // TODO: send slot request to TaskManager - } else { - LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + - "AllocationID:{}, JobID:{}", request.getAllocationId(), request.getJobId()); - allocateContainer(request.getResourceProfile()); - pendingSlotRequests.put(request.getAllocationId(), request); - } - } - - /** - * Sync slot status with TaskManager's SlotReport. - */ - public void updateSlotStatus(final SlotReport slotReport) { - for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { - updateSlotStatus(slotStatus); - } - } - - /** - * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) - * or really rejected by TaskManager. We shall retry this request by: - * <ul> - * <li>1. verify and clear all the previous allocate information for this request - * <li>2. try to request slot again - * </ul> - * <p> - * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response - * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, - * but it can be taken care of by rejecting registration at JobManager. - * - * @param originalRequest The original slot request - * @param slotId The target SlotID - */ - public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { - final AllocationID originalAllocationId = originalRequest.getAllocationId(); - LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", - slotId, originalAllocationId, originalRequest.getJobId()); - - // verify the allocation info before we do anything - if (freeSlots.containsKey(slotId)) { - // this slot is currently empty, no need to de-allocate it from our allocations - LOG.info("Original slot is somehow empty, retrying this request"); - - // before retry, we should double check whether this request was allocated by some other ways - if (!allocationMap.isAllocated(originalAllocationId)) { - requestSlot(originalRequest); - } else { - LOG.info("The failed request has somehow been allocated, SlotID:{}", - allocationMap.getSlotID(originalAllocationId)); - } - } else if (allocationMap.isAllocated(slotId)) { - final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); - - // check whether we have an agreement on whom this slot belongs to - if (originalAllocationId.equals(currentAllocationId)) { - LOG.info("De-allocate this request and retry"); - allocationMap.removeAllocation(currentAllocationId); - - // put this slot back to free pool - ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); - freeSlots.put(slotId, slot); - - // retry the request - requestSlot(originalRequest); - } else { - // the slot is taken by someone else, no need to de-allocate it from our allocations - LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId); - - // before retry, we should double check whether this request was allocated by some other ways - if (!allocationMap.isAllocated(originalAllocationId)) { - requestSlot(originalRequest); - } else { - LOG.info("The failed request is somehow been allocated, SlotID:{}", - allocationMap.getSlotID(originalAllocationId)); - } - } - } else { - LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); - } - } - - /** - * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots. - * - * @param resourceId The ResourceID of the TaskManager - */ - public void notifyTaskManagerFailure(final ResourceID resourceId) { - LOG.info("Resource:{} been notified failure", resourceId); - final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId); - if (slotIdsToRemove != null) { - for (SlotID slotId : slotIdsToRemove.keySet()) { - LOG.info("Removing Slot:{} upon resource failure", slotId); - if (freeSlots.containsKey(slotId)) { - freeSlots.remove(slotId); - } else if (allocationMap.isAllocated(slotId)) { - allocationMap.removeAllocation(slotId); - } else { - LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); - } - } - } - } - - // ------------------------------------------------------------------------ - // internal behaviors - // ------------------------------------------------------------------------ - - /** - * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report: - * <ul> - * <li>1. The slot is newly registered.</li> - * <li>2. The slot has registered, it contains its current status.</li> - * </ul> - * <p> - * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty. - * <p> - * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really - * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet, - * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So - * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that - * and take next action based on the diff between our information and heartbeat status. - * - * @param reportedStatus Reported slot status - */ - void updateSlotStatus(final SlotStatus reportedStatus) { - final SlotID slotId = reportedStatus.getSlotID(); - final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler()); - - if (registerNewSlot(slot)) { - // we have a newly registered slot - LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID()); - - if (reportedStatus.getAllocationID() != null) { - // slot in use, record this in bookkeeping - allocationMap.addAllocation(slotId, reportedStatus.getAllocationID()); - } else { - handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler())); - } - } else { - // slot exists, update current information - if (reportedStatus.getAllocationID() != null) { - // slot is reported in use - final AllocationID reportedAllocationId = reportedStatus.getAllocationID(); - - // check whether we also thought this slot is in use - if (allocationMap.isAllocated(slotId)) { - // we also think that slot is in use, check whether the AllocationID matches - final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); - - if (!reportedAllocationId.equals(currentAllocationId)) { - LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}", - slotId, currentAllocationId, reportedAllocationId); - - // seems we have a disagreement about the slot assignments, need to correct it - allocationMap.removeAllocation(slotId); - allocationMap.addAllocation(slotId, reportedAllocationId); - } - } else { - LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}", - slotId, reportedAllocationId); - - // we thought the slot is free, should correct this information - allocationMap.addAllocation(slotId, reportedStatus.getAllocationID()); - - // remove this slot from free slots pool - freeSlots.remove(slotId); - } - } else { - // slot is reported empty - - // check whether we also thought this slot is empty - if (allocationMap.isAllocated(slotId)) { - LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", - slotId, allocationMap.getAllocationID(slotId)); - - // we thought the slot is in use, correct it - allocationMap.removeAllocation(slotId); - - // we have a free slot! - handleFreeSlot(new ResourceSlot(slotId, reportedStatus.getProfiler())); - } - } - } - } - - /** - * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled, - * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot - * to the free pool. - * - * @param freeSlot The free slot - */ - private void handleFreeSlot(final ResourceSlot freeSlot) { - SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests); - - if (chosenRequest != null) { - pendingSlotRequests.remove(chosenRequest.getAllocationId()); - - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(), - chosenRequest.getAllocationId(), chosenRequest.getJobId()); - allocationMap.addAllocation(freeSlot.getSlotId(), chosenRequest.getAllocationId()); - - // TODO: send slot request to TaskManager - } else { - freeSlots.put(freeSlot.getSlotId(), freeSlot); - } - } - - /** - * Check whether the request is duplicated. We use AllocationID to identify slot request, for each - * formerly received slot request, it is either in pending list or already been allocated. - * - * @param request The slot request - * @return <tt>true</tt> if the request is duplicated - */ - private boolean isRequestDuplicated(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - return pendingSlotRequests.containsKey(allocationId) - || allocationMap.isAllocated(allocationId); - } - - /** - * Try to register slot, and tell if this slot is newly registered. - * - * @param slot The ResourceSlot which will be checked and registered - * @return <tt>true</tt> if we meet a new slot - */ - private boolean registerNewSlot(final ResourceSlot slot) { - final SlotID slotId = slot.getSlotId(); - final ResourceID resourceId = slotId.getResourceID(); - if (!registeredSlots.containsKey(resourceId)) { - registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>()); - } - return registeredSlots.get(resourceId).put(slotId, slot) == null; - } - - private ResourceSlot getRegisteredSlot(final SlotID slotId) { - final ResourceID resourceId = slotId.getResourceID(); - if (!registeredSlots.containsKey(resourceId)) { - return null; - } - return registeredSlots.get(resourceId).get(slotId); - } - - // ------------------------------------------------------------------------ - // Framework specific behavior - // ------------------------------------------------------------------------ - - /** - * Choose a slot to use among all free slots, the behavior is framework specified. - * - * @param request The slot request - * @param freeSlots All slots which can be used - * @return The slot we choose to use, <tt>null</tt> if we did not find a match - */ - protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request, - final Map<SlotID, ResourceSlot> freeSlots); - - /** - * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified. - * - * @param offeredSlot The free slot - * @param pendingRequests All the pending slot requests - * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match - */ - protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot, - final Map<AllocationID, SlotRequest> pendingRequests); - - /** - * The framework specific code for allocating a container for specified resource profile. - * - * @param resourceProfile The resource profile - */ - protected abstract void allocateContainer(final ResourceProfile resourceProfile); - - - // ------------------------------------------------------------------------ - // Helper classes - // ------------------------------------------------------------------------ - - /** - * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info - * either by SlotID or AllocationID. - */ - private static class AllocationMap { - - /** All allocated slots (by SlotID) */ - private final Map<SlotID, AllocationID> allocatedSlots; - - /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */ - private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId; - - AllocationMap() { - this.allocatedSlots = new HashMap<>(16); - this.allocatedSlotsByAllocationId = new HashMap<>(16); - } - - /** - * Add a allocation - * - * @param slotId The slot id - * @param allocationId The allocation id - */ - void addAllocation(final SlotID slotId, final AllocationID allocationId) { - allocatedSlots.put(slotId, allocationId); - allocatedSlotsByAllocationId.put(allocationId, slotId); - } - - /** - * De-allocation with slot id - * - * @param slotId The slot id - */ - void removeAllocation(final SlotID slotId) { - if (allocatedSlots.containsKey(slotId)) { - final AllocationID allocationId = allocatedSlots.get(slotId); - allocatedSlots.remove(slotId); - allocatedSlotsByAllocationId.remove(allocationId); - } - } - - /** - * De-allocation with allocation id - * - * @param allocationId The allocation id - */ - void removeAllocation(final AllocationID allocationId) { - if (allocatedSlotsByAllocationId.containsKey(allocationId)) { - SlotID slotId = allocatedSlotsByAllocationId.get(allocationId); - allocatedSlotsByAllocationId.remove(allocationId); - allocatedSlots.remove(slotId); - } - } - - /** - * Check whether allocation exists by slot id - * - * @param slotId The slot id - * @return true if the allocation exists - */ - boolean isAllocated(final SlotID slotId) { - return allocatedSlots.containsKey(slotId); - } - - /** - * Check whether allocation exists by allocation id - * - * @param allocationId The allocation id - * @return true if the allocation exists - */ - boolean isAllocated(final AllocationID allocationId) { - return allocatedSlotsByAllocationId.containsKey(allocationId); - } - - AllocationID getAllocationID(final SlotID slotId) { - return allocatedSlots.get(slotId); - } - - SlotID getSlotID(final AllocationID allocationId) { - return allocatedSlotsByAllocationId.get(allocationId); - } - - public int size() { - return allocatedSlots.size(); - } - } - - // ------------------------------------------------------------------------ - // Testing utilities - // ------------------------------------------------------------------------ - - @VisibleForTesting - boolean isAllocated(final SlotID slotId) { - return allocationMap.isAllocated(slotId); - } - - @VisibleForTesting - boolean isAllocated(final AllocationID allocationId) { - return allocationMap.isAllocated(allocationId); - } - - /** - * Add free slots directly to the free pool, this will not trigger pending requests allocation - * - * @param slot The resource slot - */ - @VisibleForTesting - void addFreeSlot(final ResourceSlot slot) { - final ResourceID resourceId = slot.getResourceID(); - final SlotID slotId = slot.getSlotId(); - - if (!registeredSlots.containsKey(resourceId)) { - registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>()); - } - registeredSlots.get(resourceId).put(slot.getSlotId(), slot); - freeSlots.put(slotId, slot); - } - - @VisibleForTesting - int getAllocatedSlotCount() { - return allocationMap.size(); - } - - @VisibleForTesting - int getFreeSlotCount() { - return freeSlots.size(); - } - - @VisibleForTesting - int getPendingRequestCount() { - return pendingSlotRequests.size(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java new file mode 100644 index 0000000..6b7f6dc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; + +import java.io.Serializable; + +/** + * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager + */ +public class SlotRequestRegistered extends SlotRequestReply { + + public SlotRequestRegistered(AllocationID allocationID) { + super(allocationID); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java new file mode 100644 index 0000000..cb3ec72 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; + +import java.io.Serializable; + +/** + * Rejection message by the ResourceManager for a SlotRequest from the JobManager + */ +public class SlotRequestRejected extends SlotRequestReply { + + public SlotRequestRejected(AllocationID allocationID) { + super(allocationID); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java new file mode 100644 index 0000000..1b85d0c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; + +import java.io.Serializable; + +/** + * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager + */ +public abstract class SlotRequestReply implements Serializable { + + private static final long serialVersionUID = 42; + + private final AllocationID allocationID; + + public SlotRequestReply(AllocationID allocationID) { + this.allocationID = allocationID; + } + + public AllocationID getAllocationID() { + return allocationID; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java new file mode 100644 index 0000000..ef5ce31 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.resourcemanager.SlotRequest; + +import java.util.Iterator; +import java.util.Map; + +/** + * A simple SlotManager which ignores resource profiles. + */ +public class SimpleSlotManager extends SlotManager { + + @Override + protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) { + final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator(); + if (slotIterator.hasNext()) { + return slotIterator.next(); + } else { + return null; + } + } + + @Override + protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) { + final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator(); + if (requestIterator.hasNext()) { + return requestIterator.next(); + } else { + return null; + } + } + + @Override + protected void allocateContainer(ResourceProfile resourceProfile) { + // TODO + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java new file mode 100644 index 0000000..96fde7d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.resourcemanager.SlotRequestReply; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request + * slots from registered TaskManagers and issues container allocation requests in case of there are not + * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. + * <p> + * The main operation principle of SlotManager is: + * <ul> + * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> + * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> + * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be + * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should + * be handled outside SlotManager. SlotManager will make each decision based on the information it currently + * holds.</li> + * </ul> + * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. + */ +public abstract class SlotManager implements LeaderRetrievalListener { + + protected final Logger LOG = LoggerFactory.getLogger(getClass()); + + /** All registered task managers with ResourceID and gateway. */ + private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways; + + /** All registered slots, including free and allocated slots */ + private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; + + /** All pending slot requests, waiting available slots to fulfil */ + private final Map<AllocationID, SlotRequest> pendingSlotRequests; + + /** All free slots that can be used to be allocated */ + private final Map<SlotID, ResourceSlot> freeSlots; + + /** All allocations, we can lookup allocations either by SlotID or AllocationID */ + private final AllocationMap allocationMap; + + private final FiniteDuration timeout; + + /** The current leader id set by the ResourceManager */ + private UUID leaderID; + + public SlotManager() { + this.registeredSlots = new HashMap<>(16); + this.pendingSlotRequests = new LinkedHashMap<>(16); + this.freeSlots = new HashMap<>(16); + this.allocationMap = new AllocationMap(); + this.taskManagerGateways = new HashMap<>(); + this.timeout = new FiniteDuration(10, TimeUnit.SECONDS); + } + + + // ------------------------------------------------------------------------ + // slot managements + // ------------------------------------------------------------------------ + + /** + * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container + * allocation if we don't have enough resource. If we have free slot which can match the request, record + * this allocation and forward the request to TaskManager through ResourceManager (we want this done by + * RPC's main thread to avoid race condition). + * + * @param request The detailed request of the slot + * @return SlotRequestRegistered The confirmation message to be send to the caller + */ + public SlotRequestRegistered requestSlot(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); + if (isRequestDuplicated(request)) { + LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); + return null; + } + + // try to fulfil the request with current free slots + final ResourceSlot slot = chooseSlotToUse(request, freeSlots); + if (slot != null) { + LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), + allocationId, request.getJobId()); + + // record this allocation in bookkeeping + allocationMap.addAllocation(slot.getSlotId(), allocationId); + + // remove selected slot from free pool + freeSlots.remove(slot.getSlotId()); + + final Future<SlotRequestReply> slotRequestReplyFuture = + slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout); + // TODO handle timeouts and response + } else { + LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + + "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); + allocateContainer(request.getResourceProfile()); + pendingSlotRequests.put(allocationId, request); + } + + return new SlotRequestRegistered(allocationId); + } + + /** + * Sync slot status with TaskManager's SlotReport. + */ + public void updateSlotStatus(final SlotReport slotReport) { + for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { + updateSlotStatus(slotStatus); + } + } + + /** + * Registers a TaskExecutor + * @param resourceID TaskExecutor's ResourceID + * @param gateway TaskExcutor's gateway + */ + public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway) { + this.taskManagerGateways.put(resourceID, gateway); + } + + /** + * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) + * or really rejected by TaskManager. We shall retry this request by: + * <ul> + * <li>1. verify and clear all the previous allocate information for this request + * <li>2. try to request slot again + * </ul> + * <p> + * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response + * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, + * but it can be taken care of by rejecting registration at JobManager. + * + * @param originalRequest The original slot request + * @param slotId The target SlotID + */ + public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { + final AllocationID originalAllocationId = originalRequest.getAllocationId(); + LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", + slotId, originalAllocationId, originalRequest.getJobId()); + + // verify the allocation info before we do anything + if (freeSlots.containsKey(slotId)) { + // this slot is currently empty, no need to de-allocate it from our allocations + LOG.info("Original slot is somehow empty, retrying this request"); + + // before retry, we should double check whether this request was allocated by some other ways + if (!allocationMap.isAllocated(originalAllocationId)) { + requestSlot(originalRequest); + } else { + LOG.info("The failed request has somehow been allocated, SlotID:{}", + allocationMap.getSlotID(originalAllocationId)); + } + } else if (allocationMap.isAllocated(slotId)) { + final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); + + // check whether we have an agreement on whom this slot belongs to + if (originalAllocationId.equals(currentAllocationId)) { + LOG.info("De-allocate this request and retry"); + allocationMap.removeAllocation(currentAllocationId); + + // put this slot back to free pool + ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); + freeSlots.put(slotId, slot); + + // retry the request + requestSlot(originalRequest); + } else { + // the slot is taken by someone else, no need to de-allocate it from our allocations + LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId); + + // before retry, we should double check whether this request was allocated by some other ways + if (!allocationMap.isAllocated(originalAllocationId)) { + requestSlot(originalRequest); + } else { + LOG.info("The failed request is somehow been allocated, SlotID:{}", + allocationMap.getSlotID(originalAllocationId)); + } + } + } else { + LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + } + } + + /** + * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots. + * + * @param resourceId The ResourceID of the TaskManager + */ + public void notifyTaskManagerFailure(final ResourceID resourceId) { + LOG.info("Resource:{} been notified failure", resourceId); + taskManagerGateways.remove(resourceId); + final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId); + if (slotIdsToRemove != null) { + for (SlotID slotId : slotIdsToRemove.keySet()) { + LOG.info("Removing Slot: {} upon resource failure", slotId); + if (freeSlots.containsKey(slotId)) { + freeSlots.remove(slotId); + } else if (allocationMap.isAllocated(slotId)) { + allocationMap.removeAllocation(slotId); + } else { + LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); + } + } + } + } + + // ------------------------------------------------------------------------ + // internal behaviors + // ------------------------------------------------------------------------ + + /** + * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report: + * <ul> + * <li>1. The slot is newly registered.</li> + * <li>2. The slot has registered, it contains its current status.</li> + * </ul> + * <p> + * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty. + * <p> + * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really + * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet, + * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So + * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that + * and take next action based on the diff between our information and heartbeat status. + * + * @param reportedStatus Reported slot status + */ + void updateSlotStatus(final SlotStatus reportedStatus) { + final SlotID slotId = reportedStatus.getSlotID(); + + final TaskExecutorGateway taskExecutorGateway = taskManagerGateways.get(slotId.getResourceID()); + if (taskExecutorGateway == null) { + LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager", + slotId.getResourceID()); + return; + } + + final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler(), taskExecutorGateway); + + if (registerNewSlot(slot)) { + // we have a newly registered slot + LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID()); + + if (reportedStatus.getAllocationID() != null) { + // slot in use, record this in bookkeeping + allocationMap.addAllocation(slotId, reportedStatus.getAllocationID()); + } else { + handleFreeSlot(slot); + } + } else { + // slot exists, update current information + if (reportedStatus.getAllocationID() != null) { + // slot is reported in use + final AllocationID reportedAllocationId = reportedStatus.getAllocationID(); + + // check whether we also thought this slot is in use + if (allocationMap.isAllocated(slotId)) { + // we also think that slot is in use, check whether the AllocationID matches + final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId); + + if (!reportedAllocationId.equals(currentAllocationId)) { + LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}", + slotId, currentAllocationId, reportedAllocationId); + + // seems we have a disagreement about the slot assignments, need to correct it + allocationMap.removeAllocation(slotId); + allocationMap.addAllocation(slotId, reportedAllocationId); + } + } else { + LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}", + slotId, reportedAllocationId); + + // we thought the slot is free, should correct this information + allocationMap.addAllocation(slotId, reportedStatus.getAllocationID()); + + // remove this slot from free slots pool + freeSlots.remove(slotId); + } + } else { + // slot is reported empty + + // check whether we also thought this slot is empty + if (allocationMap.isAllocated(slotId)) { + LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null", + slotId, allocationMap.getAllocationID(slotId)); + + // we thought the slot is in use, correct it + allocationMap.removeAllocation(slotId); + + // we have a free slot! + handleFreeSlot(slot); + } + } + } + } + + /** + * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled, + * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot + * to the free pool. + * + * @param freeSlot The free slot + */ + private void handleFreeSlot(final ResourceSlot freeSlot) { + SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests); + + if (chosenRequest != null) { + final AllocationID allocationId = chosenRequest.getAllocationId(); + pendingSlotRequests.remove(allocationId); + + LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(), + allocationId, chosenRequest.getJobId()); + allocationMap.addAllocation(freeSlot.getSlotId(), allocationId); + + final Future<SlotRequestReply> slotRequestReplyFuture = + freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout); + // TODO handle timeouts and response + } else { + freeSlots.put(freeSlot.getSlotId(), freeSlot); + } + } + + /** + * Check whether the request is duplicated. We use AllocationID to identify slot request, for each + * formerly received slot request, it is either in pending list or already been allocated. + * + * @param request The slot request + * @return <tt>true</tt> if the request is duplicated + */ + private boolean isRequestDuplicated(final SlotRequest request) { + final AllocationID allocationId = request.getAllocationId(); + return pendingSlotRequests.containsKey(allocationId) + || allocationMap.isAllocated(allocationId); + } + + /** + * Try to register slot, and tell if this slot is newly registered. + * + * @param slot The ResourceSlot which will be checked and registered + * @return <tt>true</tt> if we meet a new slot + */ + private boolean registerNewSlot(final ResourceSlot slot) { + final SlotID slotId = slot.getSlotId(); + final ResourceID resourceId = slotId.getResourceID(); + if (!registeredSlots.containsKey(resourceId)) { + registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>()); + } + return registeredSlots.get(resourceId).put(slotId, slot) == null; + } + + private ResourceSlot getRegisteredSlot(final SlotID slotId) { + final ResourceID resourceId = slotId.getResourceID(); + if (!registeredSlots.containsKey(resourceId)) { + return null; + } + return registeredSlots.get(resourceId).get(slotId); + } + + // ------------------------------------------------------------------------ + // Framework specific behavior + // ------------------------------------------------------------------------ + + /** + * Choose a slot to use among all free slots, the behavior is framework specified. + * + * @param request The slot request + * @param freeSlots All slots which can be used + * @return The slot we choose to use, <tt>null</tt> if we did not find a match + */ + protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request, + final Map<SlotID, ResourceSlot> freeSlots); + + /** + * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified. + * + * @param offeredSlot The free slot + * @param pendingRequests All the pending slot requests + * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match + */ + protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot, + final Map<AllocationID, SlotRequest> pendingRequests); + + /** + * The framework specific code for allocating a container for specified resource profile. + * + * @param resourceProfile The resource profile + */ + protected abstract void allocateContainer(final ResourceProfile resourceProfile); + + // ------------------------------------------------------------------------ + // Helper classes + // ------------------------------------------------------------------------ + + /** + * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info + * either by SlotID or AllocationID. + */ + private static class AllocationMap { + + /** All allocated slots (by SlotID) */ + private final Map<SlotID, AllocationID> allocatedSlots; + + /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */ + private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId; + + AllocationMap() { + this.allocatedSlots = new HashMap<>(16); + this.allocatedSlotsByAllocationId = new HashMap<>(16); + } + + /** + * Add a allocation + * + * @param slotId The slot id + * @param allocationId The allocation id + */ + void addAllocation(final SlotID slotId, final AllocationID allocationId) { + allocatedSlots.put(slotId, allocationId); + allocatedSlotsByAllocationId.put(allocationId, slotId); + } + + /** + * De-allocation with slot id + * + * @param slotId The slot id + */ + void removeAllocation(final SlotID slotId) { + if (allocatedSlots.containsKey(slotId)) { + final AllocationID allocationId = allocatedSlots.get(slotId); + allocatedSlots.remove(slotId); + allocatedSlotsByAllocationId.remove(allocationId); + } + } + + /** + * De-allocation with allocation id + * + * @param allocationId The allocation id + */ + void removeAllocation(final AllocationID allocationId) { + if (allocatedSlotsByAllocationId.containsKey(allocationId)) { + SlotID slotId = allocatedSlotsByAllocationId.get(allocationId); + allocatedSlotsByAllocationId.remove(allocationId); + allocatedSlots.remove(slotId); + } + } + + /** + * Check whether allocation exists by slot id + * + * @param slotId The slot id + * @return true if the allocation exists + */ + boolean isAllocated(final SlotID slotId) { + return allocatedSlots.containsKey(slotId); + } + + /** + * Check whether allocation exists by allocation id + * + * @param allocationId The allocation id + * @return true if the allocation exists + */ + boolean isAllocated(final AllocationID allocationId) { + return allocatedSlotsByAllocationId.containsKey(allocationId); + } + + AllocationID getAllocationID(final SlotID slotId) { + return allocatedSlots.get(slotId); + } + + SlotID getSlotID(final AllocationID allocationId) { + return allocatedSlotsByAllocationId.get(allocationId); + } + + public int size() { + return allocatedSlots.size(); + } + } + + // ------------------------------------------------------------------------ + // High availability + // ------------------------------------------------------------------------ + + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + this.leaderID = leaderSessionID; + } + + @Override + public void handleError(Exception exception) { + LOG.error("Slot Manager received an error from the leader service", exception); + } + + // ------------------------------------------------------------------------ + // Testing utilities + // ------------------------------------------------------------------------ + + @VisibleForTesting + boolean isAllocated(final SlotID slotId) { + return allocationMap.isAllocated(slotId); + } + + @VisibleForTesting + boolean isAllocated(final AllocationID allocationId) { + return allocationMap.isAllocated(allocationId); + } + + /** + * Add free slots directly to the free pool, this will not trigger pending requests allocation + * + * @param slot The resource slot + */ + @VisibleForTesting + void addFreeSlot(final ResourceSlot slot) { + final ResourceID resourceId = slot.getResourceID(); + final SlotID slotId = slot.getSlotId(); + + if (!registeredSlots.containsKey(resourceId)) { + registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>()); + } + registeredSlots.get(resourceId).put(slot.getSlotId(), slot); + freeSlots.put(slotId, slot); + } + + @VisibleForTesting + int getAllocatedSlotCount() { + return allocationMap.size(); + } + + @VisibleForTesting + int getFreeSlotCount() { + return freeSlots.size(); + } + + @VisibleForTesting + int getPendingRequestCount() { + return pendingSlotRequests.size(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java index 744b674..0f57bb1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java @@ -50,7 +50,10 @@ public class SlotStatus implements Serializable { this(slotID, profiler, null, null); } - public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID allocationID, JobID jobID) { + public SlotStatus( + SlotID slotID, ResourceProfile profiler, + JobID jobID, + AllocationID allocationID) { this.slotID = checkNotNull(slotID, "slotID cannot be null"); this.profiler = checkNotNull(profiler, "profile cannot be null"); this.allocationID = allocationID; http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 6c99706..7257436 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -18,7 +18,12 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.resourcemanager.SlotRequestReply; import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcTimeout; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; import java.util.UUID; @@ -32,4 +37,16 @@ public interface TaskExecutorGateway extends RpcGateway { // ------------------------------------------------------------------------ void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** + * Send by the ResourceManager to the TaskExecutor + * @param allocationID id for the request + * @param resourceManagerLeaderID current leader id of the ResourceManager + * @return SlotRequestReply Answer to the request + */ + + Future<SlotRequestReply> requestSlot( + AllocationID allocationID, + UUID resourceManagerLeaderID, + @RpcTimeout FiniteDuration timeout); } http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 5799e62..8183c0a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.MainThreadExecutor; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; @@ -53,7 +54,8 @@ public class ResourceManagerHATest { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService); - final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices); + SlotManager slotManager = mock(SlotManager.class); + final ResourceManager resourceManager = new ResourceManager(rpcService, highAvailabilityServices, slotManager); resourceManager.start(); // before grant leadership, resourceManager's leaderId is null Assert.assertNull(resourceManager.getLeaderSessionID());