http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index f055971..31edbf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -21,519 +21,893 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.CompletableFuture; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkNotNull; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** - * SlotManager is responsible for receiving slot requests and do slot allocations. It allows to request - * slots from registered TaskManagers and issues container allocation requests in case of there are not - * enough available slots. Besides, it should sync its slot allocation with TaskManager's heartbeat. - * <p> - * The main operation principle of SlotManager is: - * <ul> - * <li>1. All slot allocation status should be synced with TaskManager, which is the ground truth.</li> - * <li>2. All slots that have registered must be tracked, either by free pool or allocated pool.</li> - * <li>3. All slot requests will be handled by best efforts, there is no guarantee that one request will be - * fulfilled in time or correctly allocated. Conflicts or timeout or some special error will happen, it should - * be handled outside SlotManager. SlotManager will make each decision based on the information it currently - * holds.</li> - * </ul> - * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>. + * The slot manager is responsible for maintaining a view on all registered task manager slots, + * their allocation and all pending slot requests. Whenever a new slot is registered or and + * allocated slot is freed, then it tries to fulfill another pending slot request. Whenever there + * are not enough slots available the slot manager will notify the resource manager about it via + * {@link ResourceManagerActions#allocateResource(ResourceProfile)}. + * + * In order to free resources and avoid resource leaks, idling task managers (task managers whose + * slots are currently not used) and not fulfilled pending slot requests time out triggering their + * release and failure, respectively. */ -public abstract class SlotManager { +public class SlotManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); + + /** Scheduled executor for timeouts */ + private final ScheduledExecutor scheduledExecutor; + + /** Timeout for slot requests to the task manager */ + private final Time taskManagerRequestTimeout; + + /** Timeout after which an allocation is discarded */ + private final Time slotRequestTimeout; + + /** Timeout after which an unused TaskManager is released */ + private final Time taskManagerTimeout; + + /** Map for all registered slots */ + private final HashMap<SlotID, TaskManagerSlot> slots; - protected final Logger LOG = LoggerFactory.getLogger(getClass()); + /** Index of all currently free slots */ + private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots; - /** The Resource allocation provider */ - protected final ResourceManagerServices rmServices; + /** All currently registered task managers */ + private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations; - /** All registered task managers with ResourceID and gateway. */ - private final Map<ResourceID, TaskExecutorRegistration> taskManagers; + /** Map of fulfilled and active allocations for request deduplication purposes */ + private final HashMap<AllocationID, SlotID> fulfilledSlotRequests; - /** All registered slots, including free and allocated slots */ - private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots; + /** Map of pending/unfulfilled slot allocation requests */ + private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; - /** All pending slot requests, waiting available slots to fulfil */ - private final Map<AllocationID, SlotRequest> pendingSlotRequests; + /** Leader id of the containing component */ + private UUID leaderId; - /** All free slots that can be used to be allocated */ - private final Map<SlotID, ResourceSlot> freeSlots; + /** Executor for future callbacks which have to be "synchronized" */ + private Executor mainThreadExecutor; - /** All allocations, we can lookup allocations either by SlotID or AllocationID */ - private final AllocationMap allocationMap; + /** Callbacks for resource (de-)allocations */ + private ResourceManagerActions resourceManagerActions; - private final Time timeout; + /** True iff the component has been started */ + private boolean started; - public SlotManager(ResourceManagerServices rmServices) { - this.rmServices = checkNotNull(rmServices); - this.registeredSlots = new HashMap<>(16); - this.pendingSlotRequests = new LinkedHashMap<>(16); - this.freeSlots = new HashMap<>(16); - this.allocationMap = new AllocationMap(); - this.taskManagers = new HashMap<>(); - this.timeout = Time.seconds(10); + public SlotManager( + ScheduledExecutor scheduledExecutor, + Time taskManagerRequestTimeout, + Time slotRequestTimeout, + Time taskManagerTimeout) { + this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor); + this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); + this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); + this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + + slots = new HashMap<>(16); + freeSlots = new LinkedHashMap<>(16); + taskManagerRegistrations = new HashMap<>(4); + fulfilledSlotRequests = new HashMap<>(16); + pendingSlotRequests = new HashMap<>(16); + + leaderId = null; + resourceManagerActions = null; + started = false; } - // ------------------------------------------------------------------------ - // slot managements - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Component lifecycle methods + // --------------------------------------------------------------------------------------------- /** - * Request a slot with requirements, we may either fulfill the request or pending it. Trigger container - * allocation if we don't have enough resource. If we have free slot which can match the request, record - * this allocation and forward the request to TaskManager through ResourceManager (we want this done by - * RPC's main thread to avoid race condition). + * Starts the slot manager with the given leader id and resource manager actions. * - * @param request The detailed request of the slot - * @return RMSlotRequestRegistered The confirmation message to be send to the caller + * @param newLeaderId to use for communication with the task managers + * @param newResourceManagerActions to use for resource (de-)allocations + */ + public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + leaderId = Preconditions.checkNotNull(newLeaderId); + mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); + resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); + + started = true; + } + + /** + * Suspends the component. This clears the internal state of the slot manager. */ - public RMSlotRequestRegistered requestSlot(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - if (isRequestDuplicated(request)) { - LOG.warn("Duplicated slot request, AllocationID:{}", allocationId); - return new RMSlotRequestRegistered(allocationId); + public void suspend() { + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + cancelPendingSlotRequest(pendingSlotRequest); } - // try to fulfil the request with current free slots - final ResourceSlot slot = chooseSlotToUse(request, freeSlots); - if (slot != null) { - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", slot.getSlotId(), - allocationId, request.getJobId()); + pendingSlotRequests.clear(); + + ArrayList<InstanceID> registeredTaskManagers = new ArrayList<>(taskManagerRegistrations.keySet()); + + for (InstanceID registeredTaskManager : registeredTaskManagers) { + unregisterTaskManager(registeredTaskManager); + } - // record this allocation in bookkeeping - allocationMap.addAllocation(slot.getSlotId(), allocationId); - // remove selected slot from free pool - freeSlots.remove(slot.getSlotId()); + leaderId = null; + resourceManagerActions = null; + started = false; + } + + /** + * Closes the slot manager. + * + * @throws Exception if the close operation fails + */ + @Override + public void close() throws Exception { + suspend(); + } + + // --------------------------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------------------------- + + /** + * Requests a slot with the respective resource profile. + * + * @param slotRequest specifying the requested slot specs + * @return true if the slot request was registered; false if the request is a duplicate + * @throws SlotManagerException if the slot request failed (e.g. not enough resources left) + */ + public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException { + checkInit(); - sendSlotRequest(slot, request); + if (checkDuplicateRequest(slotRequest.getAllocationId())) { + LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId()); + + return false; } else { - LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " + - "AllocationID:{}, JobID:{}", allocationId, request.getJobId()); - Preconditions.checkState(rmServices != null, - "Attempted to allocate resources but no ResourceManagerServices set."); - rmServices.allocateResource(request.getResourceProfile()); - pendingSlotRequests.put(allocationId, request); + PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest); + + pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest); + + try { + internalRequestSlot(pendingSlotRequest); + } catch (ResourceManagerException e) { + // requesting the slot failed --> remove pending slot request + pendingSlotRequests.remove(slotRequest.getAllocationId()); + + throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e); + } + + return true; } + } + + /** + * Cancels and removes a pending slot request with the given allocation id. If there is no such + * pending request, then nothing is done. + * + * @param allocationId identifying the pending slot request + * @return True if a pending slot request was found; otherwise false + */ + public boolean unregisterSlotRequest(AllocationID allocationId) { + checkInit(); + + PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (null != pendingSlotRequest) { + cancelPendingSlotRequest(pendingSlotRequest); + + return true; + } else { + LOG.debug("No pending slot request with allocation id {} found.", allocationId); - return new RMSlotRequestRegistered(allocationId); + return false; + } } /** - * Notifies the SlotManager that a slot is available again after being allocated. - * @param slotID slot id of available slot + * Registers a new task manager at the slot manager. This will make the task managers slots + * known and, thus, available for allocation. + * + * @param taskExecutorConnection for the new task manager + * @param initialSlotReport for the new task manager */ - public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) { - if (!allocationMap.isAllocated(slotID)) { - throw new IllegalStateException("Slot was not previously allocated but " + - "TaskManager reports it as available again"); + public void registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) { + checkInit(); + + // we identify task managers by their instance id + if (!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) { + TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection); + taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration); } - allocationMap.removeAllocation(slotID); - final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID); - ResourceSlot freeSlot = slots.get(slotID); - if (freeSlot == null) { - throw new IllegalStateException("Slot was not registered with SlotManager but " + - "TaskManager reported it to be available."); + + reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport); + } + + /** + * Unregisters the task manager identified by the given instance id and its associated slots + * from the slot manager. + * + * @param instanceId identifying the task manager to unregister + * @return True if there existed a registered task manager with the given instance id + */ + public boolean unregisterTaskManager(InstanceID instanceId) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); + + if (null != taskManagerRegistration) { + removeSlots(taskManagerRegistration.getSlots()); + + taskManagerRegistration.cancelTimeout(); + + return true; + } else { + LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceId); + + return false; } - handleFreeSlot(freeSlot); } /** - * The slot request to TaskManager may be either failed by rpc communication (timeout, network error, etc.) - * or really rejected by TaskManager. We shall retry this request by: - * <ul> - * <li>1. verify and clear all the previous allocate information for this request - * <li>2. try to request slot again - * </ul> - * <p> - * This may cause some duplicate allocation, e.g. the slot request to TaskManager is successful but the response - * is lost somehow, so we may request a slot in another TaskManager, this causes two slots assigned to one request, - * but it can be taken care of by rejecting registration at JobManager. + * Reports the current slot allocations for a task manager identified by the given instance id. * - * @param originalRequest The original slot request - * @param slotId The target SlotID + * @param instanceId identifying the task manager for which to report the slot status + * @param slotReport containing the status for all of its slots + * @return true if the slot status has been updated successfully, otherwise false */ - void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) { - final AllocationID originalAllocationId = originalRequest.getAllocationId(); - LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}", - slotId, originalAllocationId, originalRequest.getJobId()); - - if (allocationMap.isAllocated(slotId)) { - final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId); - - // check whether we have an agreement on whom this slot belongs to - if (originalAllocationId.equals(expectedAllocationId)) { - LOG.info("De-allocate this request and retry"); - allocationMap.removeAllocation(expectedAllocationId); - pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest); - ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId)); - // treat this slot as empty and retry with a different request - handleFreeSlot(slot); - } else { - LOG.error("Slot request failed for slot {} with allocation id {}:" + - " Allocation id did not match the expected allocation id {}.", - slotId, originalAllocationId, expectedAllocationId); + public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) { + checkInit(); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); + + if (null != taskManagerRegistration) { + ArrayList<SlotID> slotsToRemove = new ArrayList<>(taskManagerRegistration.getSlots()); + boolean idle = true; + + for (SlotStatus slotStatus : slotReport) { + if (slotsToRemove.remove(slotStatus.getSlotID())) { + // slot which was already registered + updateSlot(slotStatus.getSlotID(), slotStatus.getAllocationID()); + } else { + // new slot + registerSlot( + taskManagerRegistration, + slotStatus.getSlotID(), + slotStatus.getAllocationID(), + slotStatus.getResourceProfile(), + taskManagerRegistration.getTaskManagerConnection()); + } + + TaskManagerSlot slot = slots.get(slotStatus.getSlotID()); + + idle &= slot.isFree(); + } + + // remove the slots for which we haven't received a slot status message + removeSlots(slotsToRemove); + + if (idle) { + // no slot of this task manager is being used --> register timer to free this resource + registerTaskManagerTimeout(taskManagerRegistration); } + + return true; } else { - LOG.error("Slot request failed for slot {} with allocation id {}: " + - "Slot was not previously registered.", - slotId, originalAllocationId); + LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceId); + + return false; } } /** - * Registers a TaskExecutor - * @param resourceID TaskExecutor's ResourceID - * @param registration TaskExecutor's registration - * @param slotReport TaskExecutor's free and allocated slots + * Free the given slot from the given allocation. If the slot is still allocated by the given + * allocation id, then the slot will be marked as free and will be subject to new slot requests. + * + * @param slotId identifying the slot to free + * @param allocationId with which the slot is presumably allocated */ - public void registerTaskExecutor( - ResourceID resourceID, - TaskExecutorRegistration registration, - SlotReport slotReport) { + public void freeSlot(SlotID slotId, AllocationID allocationId) { + checkInit(); - if (taskManagers.get(resourceID) != null) { - notifyTaskManagerFailure(resourceID); - } + TaskManagerSlot slot = slots.get(slotId); + + if (null != slot) { + if (slot.isAllocated()) { + if (Objects.equals(allocationId, slot.getAllocationId())) { + // free the slot + slot.setAllocationId(null); + fulfilledSlotRequests.remove(allocationId); - this.taskManagers.put(resourceID, registration); + if (slot.isFree()) { + handleFreeSlot(slot); + } - for (SlotStatus slotStatus : slotReport.getSlotsStatus()) { - final SlotID slotId = slotStatus.getSlotID(); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); - final TaskExecutorRegistration taskExecutorRegistration = taskManagers.get(slotId.getResourceID()); - if (taskExecutorRegistration == null) { - LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager", - slotId.getResourceID()); - return; + if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) { + registerTaskManagerTimeout(taskManagerRegistration); + } + } else { + LOG.debug("Received request to free slot {} with expected allocation id {}, " + + "but actual allocation id {} differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId()); + } + } else { + LOG.debug("Slot {} has not been allocated.", allocationId); } + } else { + LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", slotId); + } + } - final ResourceSlot slot = new ResourceSlot(slotId, slotStatus.getProfiler(), taskExecutorRegistration); + // --------------------------------------------------------------------------------------------- + // Behaviour methods + // --------------------------------------------------------------------------------------------- - registerNewSlot(slot); - LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, slotStatus.getAllocationID()); + /** + * Finds a matching slot request for a given resource profile. If there is no such request, + * the method returns null. + * + * Note: If you want to change the behaviour of the slot manager wrt slot allocation and + * request fulfillment, then you should override this method. + * + * @param slotResourceProfile defining the resources of an available slot + * @return A matching slot request which can be deployed in a slot with the given resource + * profile. Null if there is no such slot request pending. + */ + protected PendingSlotRequest findMatchingRequest(ResourceProfile slotResourceProfile) { - if (slotStatus.getAllocationID() != null) { - // slot in use, record this in bookkeeping - allocationMap.addAllocation(slotId, slotStatus.getAllocationID()); - } else { - handleFreeSlot(slot); + for (PendingSlotRequest pendingSlotRequest : pendingSlotRequests.values()) { + if (!pendingSlotRequest.isAssigned() && slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) { + return pendingSlotRequest; } } + + return null; } /** - * Callback for TaskManager failures. In case that a TaskManager fails, we have to clean up all its slots. + * Finds a matching slot for a given resource profile. A matching slot has at least as many + * resources available as the given resource profile. If there is no such slot available, then + * the method returns null. * - * @param resourceId The ResourceID of the TaskManager + * Note: If you want to change the behaviour of the slot manager wrt slot allocation and + * request fulfillment, then you should override this method. + * + * @param requestResourceProfile specifying the resource requirements for the a slot request + * @return A matching slot which fulfills the given resource profile. Null if there is no such + * slot available. */ - public void notifyTaskManagerFailure(final ResourceID resourceId) { - LOG.info("Resource:{} been notified failure", resourceId); - taskManagers.remove(resourceId); - final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId); - if (slotIdsToRemove != null) { - for (SlotID slotId : slotIdsToRemove.keySet()) { - LOG.info("Removing Slot: {} upon resource failure", slotId); - if (freeSlots.containsKey(slotId)) { - freeSlots.remove(slotId); - } else if (allocationMap.isAllocated(slotId)) { - allocationMap.removeAllocation(slotId); - } else { - LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId); - } + protected TaskManagerSlot findMatchingSlot(ResourceProfile requestResourceProfile) { + Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = freeSlots.entrySet().iterator(); + + while (iterator.hasNext()) { + TaskManagerSlot taskManagerSlot = iterator.next().getValue(); + + // sanity check + Preconditions.checkState(taskManagerSlot.isFree()); + + if (taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) { + iterator.remove(); + return taskManagerSlot; } } + + return null; } - // ------------------------------------------------------------------------ - // internal behaviors - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Internal slot operations + // --------------------------------------------------------------------------------------------- /** - * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled, - * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot - * to the free pool. + * Registers a slot for the given task manager at the slot manager. The slot is identified by + * the given slot id. The given resource profile defines the available resources for the slot. + * The task manager connection can be used to communicate with the task manager. * - * @param freeSlot The free slot + * @param taskManagerRegistration Task manager for which to register the given slot + * @param slotId identifying the slot on the task manager + * @param allocationId which is currently deployed in the slot + * @param resourceProfile of the slot + * @param taskManagerConnection to communicate with the remote task manager */ - private void handleFreeSlot(final ResourceSlot freeSlot) { - SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, pendingSlotRequests); + private void registerSlot( + TaskManagerRegistration taskManagerRegistration, + SlotID slotId, + AllocationID allocationId, + ResourceProfile resourceProfile, + TaskExecutorConnection taskManagerConnection) { + + Preconditions.checkNotNull(taskManagerRegistration); + + TaskManagerSlot slot = new TaskManagerSlot( + slotId, + resourceProfile, + taskManagerConnection, + allocationId); + + slots.put(slotId, slot); + + taskManagerRegistration.addSlot(slotId); + + if (slot.isFree()) { + handleFreeSlot(slot); + } - if (chosenRequest != null) { - final AllocationID allocationId = chosenRequest.getAllocationId(); - final SlotRequest slotRequest = pendingSlotRequests.remove(allocationId); + if (slot.isAllocated()) { + fulfilledSlotRequests.put(slot.getAllocationId(), slotId); + } + } + + /** + * Updates a slot with the given allocation id. + * + * @param slotId to update + * @param allocationId specifying the current allocation of the slot + */ + private void updateSlot(SlotID slotId, AllocationID allocationId) { + TaskManagerSlot slot = slots.get(slotId); + + if (null != slot) { + // we assume the given allocation id to be the ground truth (coming from the TM) + slot.setAllocationId(allocationId); + + if (null != allocationId) { + if (slot.hasPendingSlotRequest()){ + // we have a pending slot request --> check whether we have to reject it + PendingSlotRequest pendingSlotRequest = slot.getAssignedSlotRequest(); + + if (Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) { + // we can cancel the slot request because it has been fulfilled + cancelPendingSlotRequest(pendingSlotRequest); + + // remove the pending slot request, since it has been completed + pendingSlotRequests.remove(pendingSlotRequest.getAllocationId()); + } else { + // this will try to find a new slot for the request + rejectPendingSlotRequest( + pendingSlotRequest, + new Exception("Task manager reported slot " + slotId + " being already allocated.")); + } + + slot.setAssignedSlotRequest(null); + } - LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(), - allocationId, chosenRequest.getJobId()); - allocationMap.addAllocation(freeSlot.getSlotId(), allocationId); + fulfilledSlotRequests.put(allocationId, slotId); - sendSlotRequest(freeSlot, slotRequest); + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); + + if (null != taskManagerRegistration) { + // disable any registered time out for the task manager + taskManagerRegistration.cancelTimeout(); + } + } } else { - freeSlots.put(freeSlot.getSlotId(), freeSlot); + LOG.debug("Trying to update unknown slot with slot id {}.", slotId); } } - private void sendSlotRequest(final ResourceSlot freeSlot, final SlotRequest slotRequest) { + /** + * Tries to allocate a slot for the given slot request. If there is no slot available, the + * resource manager is informed to allocate more resources and a timeout for the request is + * registered. + * + * @param pendingSlotRequest to allocate a slot for + * @throws ResourceManagerException if the resource manager cannot allocate more resource + */ + private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException { + TaskManagerSlot taskManagerSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile()); - final AllocationID allocationID = slotRequest.getAllocationId(); - final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration(); - final Future<TMSlotRequestReply> slotRequestReplyFuture = - registration.getTaskExecutorGateway() - .requestSlot( - freeSlot.getSlotId(), - slotRequest.getJobId(), - allocationID, - "foobar", // TODO: set proper JM address - rmServices.getLeaderID(), - timeout); + if (taskManagerSlot != null) { + allocateSlot(taskManagerSlot, pendingSlotRequest); + } else { + final UUID timeoutIdentifier = UUID.randomUUID(); + final AllocationID allocationId = pendingSlotRequest.getAllocationId(); + + // register timeout for slot request + ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + timeoutSlotRequest(allocationId, timeoutIdentifier); + } + }); + } + }, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); + + pendingSlotRequest.registerTimeout(timeoutFuture, timeoutIdentifier); - slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() { + resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile()); + } + } + + /** + * Allocates the given slot for the given slot request. This entails sending a registration + * message to the task manager and treating failures. + * + * @param taskManagerSlot to allocate for the given slot request + * @param pendingSlotRequest to allocate the given slot for + */ + private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) { + TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection(); + TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway(); + + final CompletableFuture<Acknowledge> completableFuture = new FlinkCompletableFuture<>(); + final AllocationID allocationId = pendingSlotRequest.getAllocationId(); + final SlotID slotId = taskManagerSlot.getSlotId(); + + taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest); + pendingSlotRequest.setRequestFuture(completableFuture); + + // RPC call to the task manager + Future<Acknowledge> requestFuture = gateway.requestSlot( + slotId, + pendingSlotRequest.getJobId(), + allocationId, + pendingSlotRequest.getTargetAddress(), + leaderId, + taskManagerRequestTimeout); + + requestFuture.handle(new BiFunction<Acknowledge, Throwable, Void>() { + @Override + public Void apply(Acknowledge acknowledge, Throwable throwable) { + if (acknowledge != null) { + completableFuture.complete(acknowledge); + } else { + completableFuture.completeExceptionally(throwable); + } + + return null; + } + }); + + completableFuture.handleAsync(new BiFunction<Acknowledge, Throwable, Void>() { @Override - public Void apply(TMSlotRequestReply slotRequestReply, Throwable throwable) { - TaskExecutorRegistration current = taskManagers.get(slotRequestReply.getResourceID()); - if (current != null && current.getInstanceID().equals(slotRequestReply.getInstanceID())) { - if (throwable != null || slotRequestReply instanceof TMSlotRequestRejected) { - handleSlotRequestFailedAtTaskManager(slotRequest, freeSlot.getSlotId()); + public Void apply(Acknowledge acknowledge, Throwable throwable) { + if (acknowledge != null) { + updateSlot(slotId, allocationId); + } else { + if (throwable instanceof SlotOccupiedException) { + SlotOccupiedException exception = (SlotOccupiedException) throwable; + updateSlot(slotId, exception.getAllocationId()); } else { - LOG.debug("Successfully registered slot {} ", freeSlot.getSlotId()); + removeSlotRequestFromSlot(slotId, allocationId); + } + + if (!(throwable instanceof CancellationException)) { + handleFailedSlotRequest(slotId, allocationId, throwable); + } else { + LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable); } - } else { - LOG.debug("Discarding message from obsolete TaskExecutor with InstanceID {}", - slotRequestReply.getInstanceID()); } + return null; } - }, rmServices.getMainThreadExecutor()); + }, mainThreadExecutor); } /** - * Check whether the request is duplicated. We use AllocationID to identify slot request, for each - * formerly received slot request, it is either in pending list or already been allocated. + * Handles a free slot. It first tries to find a pending slot request which can be fulfilled. + * If there is no such request, then it will add the slot to the set of free slots. * - * @param request The slot request - * @return <tt>true</tt> if the request is duplicated + * @param freeSlot to find a new slot request for */ - private boolean isRequestDuplicated(final SlotRequest request) { - final AllocationID allocationId = request.getAllocationId(); - return pendingSlotRequests.containsKey(allocationId) - || allocationMap.isAllocated(allocationId); + private void handleFreeSlot(TaskManagerSlot freeSlot) { + PendingSlotRequest pendingSlotRequest = findMatchingRequest(freeSlot.getResourceProfile()); + + if (null != pendingSlotRequest) { + allocateSlot(freeSlot, pendingSlotRequest); + } else { + freeSlots.put(freeSlot.getSlotId(), freeSlot); + } } /** - * Registers a new slot with the SlotManager. + * Removes the given set of slots from the slot manager. * - * @param slot The ResourceSlot which will be registered + * @param slotsToRemove identifying the slots to remove from the slot manager */ - private void registerNewSlot(final ResourceSlot slot) { - final SlotID slotId = slot.getSlotId(); - final ResourceID resourceId = slotId.getResourceID(); - if (!registeredSlots.containsKey(resourceId)) { - registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>()); + private void removeSlots(Iterable<SlotID> slotsToRemove) { + for (SlotID slotId : slotsToRemove) { + removeSlot(slotId); } - registeredSlots.get(resourceId).put(slotId, slot); } - private ResourceSlot getRegisteredSlot(final SlotID slotId) { - final ResourceID resourceId = slotId.getResourceID(); - if (!registeredSlots.containsKey(resourceId)) { - return null; + /** + * Removes the given slot from the slot manager. + * + * @param slotId identifying the slot to remove + */ + private void removeSlot(SlotID slotId) { + TaskManagerSlot slot = slots.remove(slotId); + + if (null != slot) { + freeSlots.remove(slotId); + + if (slot.hasPendingSlotRequest()) { + // reject the pending slot request --> triggering a new allocation attempt + rejectPendingSlotRequest( + slot.getAssignedSlotRequest(), + new Exception("The assigned slot " + slot.getSlotId() + " was removed.")); + } + + AllocationID oldAllocationId = slot.getAllocationId(); + + fulfilledSlotRequests.remove(oldAllocationId); + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId()); + + if (null != taskManagerRegistration) { + taskManagerRegistration.removeSlot(slotId); + } + } else { + LOG.debug("There was no slot registered with slot id {}.", slotId); } - return registeredSlots.get(resourceId).get(slotId); } - // ------------------------------------------------------------------------ - // Framework specific behavior - // ------------------------------------------------------------------------ + // --------------------------------------------------------------------------------------------- + // Internal request handling methods + // --------------------------------------------------------------------------------------------- /** - * Choose a slot to use among all free slots, the behavior is framework specified. + * Removes a pending slot request identified by the given allocation id from a slot identified + * by the given slot id. * - * @param request The slot request - * @param freeSlots All slots which can be used - * @return The slot we choose to use, <tt>null</tt> if we did not find a match + * @param slotId identifying the slot + * @param allocationId identifying the presumable assigned pending slot request */ - protected abstract ResourceSlot chooseSlotToUse(final SlotRequest request, - final Map<SlotID, ResourceSlot> freeSlots); + private void removeSlotRequestFromSlot(SlotID slotId, AllocationID allocationId) { + TaskManagerSlot taskManagerSlot = slots.get(slotId); + + if (null != taskManagerSlot) { + if (taskManagerSlot.hasPendingSlotRequest() && Objects.equals(allocationId, taskManagerSlot.getAssignedSlotRequest().getAllocationId())) { + taskManagerSlot.setAssignedSlotRequest(null); + } + + if (taskManagerSlot.isFree()) { + handleFreeSlot(taskManagerSlot); + } + + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(taskManagerSlot.getInstanceId()); + + if (null != taskManagerRegistration && !anySlotUsed(taskManagerRegistration.getSlots())) { + registerTaskManagerTimeout(taskManagerRegistration); + } + } else { + LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotId); + } + } /** - * Choose a pending request to fulfill when we have a free slot, the behavior is framework specified. + * Handles a failed slot request. The slot manager tries to find a new slot fulfilling + * the resource requirements for the failed slot request. * - * @param offeredSlot The free slot - * @param pendingRequests All the pending slot requests - * @return The chosen SlotRequest, <tt>null</tt> if we did not find a match + * @param slotId identifying the slot which was assigned to the slot request before + * @param allocationId identifying the failed slot request + * @param cause of the failure */ - protected abstract SlotRequest chooseRequestToFulfill(final ResourceSlot offeredSlot, - final Map<AllocationID, SlotRequest> pendingRequests); + private void handleFailedSlotRequest(SlotID slotId, AllocationID allocationId, Throwable cause) { + PendingSlotRequest pendingSlotRequest = pendingSlotRequests.get(allocationId); + + LOG.debug("Slot request with allocation id {} failed for slot {}.", allocationId, slotId, cause); + + if (null != pendingSlotRequest) { + pendingSlotRequest.setRequestFuture(null); + + try { + internalRequestSlot(pendingSlotRequest); + } catch (ResourceManagerException e) { + pendingSlotRequests.remove(allocationId); - // ------------------------------------------------------------------------ - // Helper classes - // ------------------------------------------------------------------------ + resourceManagerActions.notifyAllocationFailure( + pendingSlotRequest.getJobId(), + allocationId, + e); + } + } else { + LOG.debug("There was not pending slot request with allocation id {}. Probably the request has been fulfilled or cancelled.", allocationId); + } + } /** - * We maintain all the allocations with SlotID and AllocationID. We are able to get or remove the allocation info - * either by SlotID or AllocationID. + * Rejects the pending slot request by failing the request future with a + * {@link SlotAllocationException}. + * + * @param pendingSlotRequest to reject + * @param cause of the rejection */ - private static class AllocationMap { + private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exception cause) { + CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture(); - /** All allocated slots (by SlotID) */ - private final Map<SlotID, AllocationID> allocatedSlots; + if (null != request) { + request.completeExceptionally(new SlotAllocationException(cause)); + } else { + LOG.debug("Cannot reject pending slot request {}, since no request has been sent.", pendingSlotRequest.getAllocationId()); + } + } - /** All allocated slots (by AllocationID), it'a a inverse view of allocatedSlots */ - private final Map<AllocationID, SlotID> allocatedSlotsByAllocationId; + /** + * Cancels the given slot request. + * + * @param pendingSlotRequest to cancel + */ + private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) { + pendingSlotRequest.cancelTimeout(); - AllocationMap() { - this.allocatedSlots = new HashMap<>(16); - this.allocatedSlotsByAllocationId = new HashMap<>(16); - } + CompletableFuture<Acknowledge> request = pendingSlotRequest.getRequestFuture(); - /** - * Add a allocation - * - * @param slotId The slot id - * @param allocationId The allocation id - */ - void addAllocation(final SlotID slotId, final AllocationID allocationId) { - allocatedSlots.put(slotId, allocationId); - allocatedSlotsByAllocationId.put(allocationId, slotId); + if (null != request) { + request.cancel(false); } + } + + // --------------------------------------------------------------------------------------------- + // Internal timeout methods + // --------------------------------------------------------------------------------------------- + + private void timeoutTaskManager(InstanceID instanceId, UUID timeoutIdentifier) { + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.remove(instanceId); - /** - * De-allocation with slot id - * - * @param slotId The slot id - */ - void removeAllocation(final SlotID slotId) { - if (allocatedSlots.containsKey(slotId)) { - final AllocationID allocationId = allocatedSlots.get(slotId); - allocatedSlots.remove(slotId); - allocatedSlotsByAllocationId.remove(allocationId); + if (null != taskManagerRegistration) { + if (Objects.equals(timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier())) { + if (anySlotUsed(taskManagerRegistration.getSlots())) { + LOG.debug("Cannot release the task manager with instance id {}, because some " + + "of its slots are still being used.", instanceId); + } else { + unregisterTaskManager(instanceId); + + resourceManagerActions.releaseResource(instanceId); + } + } else { + taskManagerRegistrations.put(instanceId, taskManagerRegistration); + + LOG.debug("Expected timeout identifier {} differs from the task manager's " + + "timeout identifier {}. Ignoring the task manager timeout call.", + timeoutIdentifier, taskManagerRegistration.getTimeoutIdentifier()); } + } else { + LOG.debug("Could not find a registered task manager with instance id {}. Ignoring the task manager timeout call.", instanceId); } + } - /** - * De-allocation with allocation id - * - * @param allocationId The allocation id - */ - void removeAllocation(final AllocationID allocationId) { - if (allocatedSlotsByAllocationId.containsKey(allocationId)) { - SlotID slotId = allocatedSlotsByAllocationId.get(allocationId); - allocatedSlotsByAllocationId.remove(allocationId); - allocatedSlots.remove(slotId); + private void timeoutSlotRequest(AllocationID allocationId, UUID timeoutIdentifier) { + PendingSlotRequest pendingSlotRequest = pendingSlotRequests.remove(allocationId); + + if (null != pendingSlotRequest) { + if (Objects.equals(timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier())) { + if (!pendingSlotRequest.isAssigned()) { + + resourceManagerActions.notifyAllocationFailure( + pendingSlotRequest.getJobId(), + allocationId, + new TimeoutException("The allocation could not be fulfilled in time.")); + } else { + LOG.debug("Cannot fail pending slot request {} because it has been assigned.", allocationId); + } + } else { + pendingSlotRequests.put(allocationId, pendingSlotRequest); + + LOG.debug("Expected timeout identifier {} differs from the pending slot request's " + + "timeout identifier {}. Ignoring the slot request timeout call.", + timeoutIdentifier, pendingSlotRequest.getTimeoutIdentifier()); } + } else { + LOG.debug("Could not find pending slot request with allocation id {}. Ignoring the slot request timeout call.", allocationId); } + } - /** - * Check whether allocation exists by slot id - * - * @param slotId The slot id - * @return true if the allocation exists - */ - boolean isAllocated(final SlotID slotId) { - return allocatedSlots.containsKey(slotId); - } + // --------------------------------------------------------------------------------------------- + // Internal utility methods + // --------------------------------------------------------------------------------------------- - /** - * Check whether allocation exists by allocation id - * - * @param allocationId The allocation id - * @return true if the allocation exists - */ - boolean isAllocated(final AllocationID allocationId) { - return allocatedSlotsByAllocationId.containsKey(allocationId); - } + private boolean checkDuplicateRequest(AllocationID allocationId) { + return pendingSlotRequests.containsKey(allocationId) || fulfilledSlotRequests.containsKey(allocationId); + } - AllocationID getAllocationID(final SlotID slotId) { - return allocatedSlots.get(slotId); - } + private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) { - SlotID getSlotID(final AllocationID allocationId) { - return allocatedSlotsByAllocationId.get(allocationId); - } + if (null != slotsToCheck) { + boolean idle = true; - public int size() { - return allocatedSlots.size(); - } + for (SlotID slotId : slotsToCheck) { + TaskManagerSlot taskManagerSlot = slots.get(slotId); - public void clear() { - allocatedSlots.clear(); - allocatedSlotsByAllocationId.clear(); + if (null != taskManagerSlot) { + idle &= taskManagerSlot.isFree(); + } + } + + return !idle; + } else { + return false; } } - /** - * Clears the state of the SlotManager after leadership revokal - */ - public void clearState() { - taskManagers.clear(); - registeredSlots.clear(); - pendingSlotRequests.clear(); - freeSlots.clear(); - allocationMap.clear(); - } + private void registerTaskManagerTimeout(final TaskManagerRegistration taskManagerRegistration) { + final UUID timeoutIdentifier = UUID.randomUUID(); - // ------------------------------------------------------------------------ - // Testing utilities - // ------------------------------------------------------------------------ + ScheduledFuture<?> timeoutFuture = scheduledExecutor.schedule(new Runnable() { + @Override + public void run() { + mainThreadExecutor.execute(new Runnable() { + @Override + public void run() { + timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier); + } + }); + } + }, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); - @VisibleForTesting - boolean isAllocated(final SlotID slotId) { - return allocationMap.isAllocated(slotId); + taskManagerRegistration.registerTimeout(timeoutFuture, timeoutIdentifier); } - @VisibleForTesting - boolean isAllocated(final AllocationID allocationId) { - return allocationMap.isAllocated(allocationId); + private void checkInit() { + Preconditions.checkState(started, "The slot manager has not been started."); } - /** - * Add free slots directly to the free pool, this will not trigger pending requests allocation - * - * @param slot The resource slot - */ - @VisibleForTesting - void addFreeSlot(final ResourceSlot slot) { - final ResourceID resourceId = slot.getResourceID(); - final SlotID slotId = slot.getSlotId(); + // --------------------------------------------------------------------------------------------- + // Testing methods + // --------------------------------------------------------------------------------------------- - if (!registeredSlots.containsKey(resourceId)) { - registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>()); - } - registeredSlots.get(resourceId).put(slot.getSlotId(), slot); - freeSlots.put(slotId, slot); + @VisibleForTesting + TaskManagerSlot getSlot(SlotID slotId) { + return slots.get(slotId); } @VisibleForTesting - int getAllocatedSlotCount() { - return allocationMap.size(); + int getNumberRegisteredSlots() { + return slots.size(); } @VisibleForTesting - int getFreeSlotCount() { - return freeSlots.size(); + PendingSlotRequest getSlotRequest(AllocationID allocationId) { + return pendingSlotRequests.get(allocationId); } @VisibleForTesting - int getPendingRequestCount() { - return pendingSlotRequests.size(); + boolean hasTimeoutRegistered(InstanceID instanceId) { + TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceId); + + if (null != taskManagerRegistration) { + return taskManagerRegistration.getTimeoutIdentifier() != null; + } else { + return false; + } } }
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java new file mode 100644 index 0000000..d21c251 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.util.Preconditions; +import scala.concurrent.duration.Duration; + +public class SlotManagerConfiguration { + + private final Time taskManagerRequestTimeout; + private final Time slotRequestTimeout; + private final Time taskManagerTimeout; + + public SlotManagerConfiguration( + Time taskManagerRequestTimeout, + Time slotRequestTimeout, + Time taskManagerTimeout) { + this.taskManagerRequestTimeout = Preconditions.checkNotNull(taskManagerRequestTimeout); + this.slotRequestTimeout = Preconditions.checkNotNull(slotRequestTimeout); + this.taskManagerTimeout = Preconditions.checkNotNull(taskManagerTimeout); + } + + public Time getTaskManagerRequestTimeout() { + return taskManagerRequestTimeout; + } + + public Time getSlotRequestTimeout() { + return slotRequestTimeout; + } + + public Time getTaskManagerTimeout() { + return taskManagerTimeout; + } + + public static SlotManagerConfiguration fromConfiguration(Configuration configuration) throws ConfigurationException { + ConfigOption<String> timeoutOption = ConfigOptions + .key(ConfigConstants.AKKA_ASK_TIMEOUT) + .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT); + + final String strTimeout = configuration.getString(timeoutOption); + final Time timeout; + + try { + timeout = Time.milliseconds(Duration.apply(strTimeout).toMillis()); + } catch (NumberFormatException e) { + throw new ConfigurationException("Could not parse the resource manager's timeout " + + "value " + timeoutOption + '.', e); + } + + return new SlotManagerConfiguration(timeout, timeout, timeout); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java new file mode 100644 index 0000000..c322c81 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; + +public class SlotManagerException extends ResourceManagerException { + + private static final long serialVersionUID = -3723028616920379071L; + + public SlotManagerException(String message) { + super(message); + } + + public SlotManagerException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java deleted file mode 100644 index b4e9c99..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.resourcemanager.slotmanager; - -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; - -/** - * Factory to create a SlotManager and provide it with dependencies. - */ -public interface SlotManagerFactory { - - /** - * Creates a SlotManager and provides it with ResourceManager services. - */ - SlotManager create(ResourceManagerServices rmServices); -} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java new file mode 100644 index 0000000..8e23dbb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; +import org.apache.flink.util.Preconditions; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledFuture; + +public class TaskManagerRegistration { + + private final TaskExecutorConnection taskManagerConnection; + + private final HashSet<SlotID> slots; + + private UUID timeoutIdentifier; + + private ScheduledFuture<?> timeoutFuture; + + public TaskManagerRegistration(TaskExecutorConnection taskManagerConnection) { + this.taskManagerConnection = Preconditions.checkNotNull(taskManagerConnection); + + slots = new HashSet<>(4); + + timeoutIdentifier = null; + timeoutFuture = null; + } + + public TaskExecutorConnection getTaskManagerConnection() { + return taskManagerConnection; + } + + public InstanceID getInstanceId() { + return taskManagerConnection.getInstanceID(); + } + + public UUID getTimeoutIdentifier() { + return timeoutIdentifier; + } + + public Set<SlotID> getSlots() { + return slots; + } + + public boolean removeSlot(SlotID slotId) { + return slots.remove(slotId); + } + + public void addSlot(SlotID slotId) { + slots.add(slotId); + } + + public void cancelTimeout() { + if (null != timeoutFuture) { + timeoutFuture.cancel(false); + + timeoutFuture = null; + timeoutIdentifier = null; + } + } + + public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID newTimeoutIdentifier) { + cancelTimeout(); + + timeoutFuture = newTimeoutFuture; + timeoutIdentifier = newTimeoutIdentifier; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java index 54adce6..7a9da28 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java @@ -19,8 +19,9 @@ package org.apache.flink.runtime.taskexecutor; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; -import java.util.List; +import java.util.Iterator; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -29,12 +30,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * which slots are available and allocated, and what jobs (JobManagers) the allocated slots * have been allocated to. */ -public class SlotReport implements Serializable { +public class SlotReport implements Serializable, Iterable<SlotStatus> { private static final long serialVersionUID = -3150175198722481689L; /** The slots status of the TaskManager */ - private final List<SlotStatus> slotsStatus; + private final Collection<SlotStatus> slotsStatus; public SlotReport() { this(Collections.<SlotStatus>emptyList()); @@ -44,12 +45,16 @@ public class SlotReport implements Serializable { this(Collections.singletonList(slotStatus)); } - public SlotReport(final List<SlotStatus> slotsStatus) { + public SlotReport(final Collection<SlotStatus> slotsStatus) { this.slotsStatus = checkNotNull(slotsStatus); } - public List<SlotStatus> getSlotsStatus() { + public Collection<SlotStatus> getSlotsStatus() { return slotsStatus; } + @Override + public Iterator<SlotStatus> iterator() { + return slotsStatus.iterator(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java index 0f57bb1..a3bc4a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java @@ -38,7 +38,7 @@ public class SlotStatus implements Serializable { private final SlotID slotID; /** the resource profile of the slot */ - private final ResourceProfile profiler; + private final ResourceProfile resourceProfile; /** if the slot is allocated, allocationId identify its allocation; else, allocationId is null */ private final AllocationID allocationID; @@ -46,16 +46,17 @@ public class SlotStatus implements Serializable { /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) { + this(slotID, resourceProfile, null, null); } public SlotStatus( - SlotID slotID, ResourceProfile profiler, + SlotID slotID, + ResourceProfile resourceProfile, JobID jobID, AllocationID allocationID) { this.slotID = checkNotNull(slotID, "slotID cannot be null"); - this.profiler = checkNotNull(profiler, "profile cannot be null"); + this.resourceProfile = checkNotNull(resourceProfile, "profile cannot be null"); this.allocationID = allocationID; this.jobID = jobID; } @@ -74,8 +75,8 @@ public class SlotStatus implements Serializable { * * @return The resource profile */ - public ResourceProfile getProfiler() { - return profiler; + public ResourceProfile getResourceProfile() { + return resourceProfile; } /** @@ -110,7 +111,7 @@ public class SlotStatus implements Serializable { if (!slotID.equals(that.slotID)) { return false; } - if (!profiler.equals(that.profiler)) { + if (!resourceProfile.equals(that.resourceProfile)) { return false; } if (allocationID != null ? !allocationID.equals(that.allocationID) : that.allocationID != null) { @@ -123,7 +124,7 @@ public class SlotStatus implements Serializable { @Override public int hashCode() { int result = slotID.hashCode(); - result = 31 * result + profiler.hashCode(); + result = 31 * result + resourceProfile.hashCode(); result = 31 * result + (allocationID != null ? allocationID.hashCode() : 0); result = 31 * result + (jobID != null ? jobID.hashCode() : 0); return result; http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index d04cabb..5b8c8ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -61,8 +61,6 @@ import org.apache.flink.runtime.registration.RegistrationConnectionListener; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcMethod; @@ -71,6 +69,7 @@ import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException; import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskException; import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException; import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder; @@ -99,6 +98,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -315,7 +315,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { throw new TaskSubmissionException(message); } - if (!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) { + if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) { final String message = "Rejecting the task submission because the job manager leader id " + jobManagerLeaderId + " does not match the expected job manager leader id " + jobManagerConnection.getLeaderId() + '.'; @@ -355,30 +355,30 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); Task task = new Task( - jobInformation, - taskInformation, - tdd.getExecutionAttemptId(), - tdd.getAllocationId(), - tdd.getSubtaskIndex(), - tdd.getAttemptNumber(), - tdd.getProducedPartitions(), - tdd.getInputGates(), - tdd.getTargetSlotNumber(), - tdd.getTaskStateHandles(), - memoryManager, - ioManager, - networkEnvironment, - broadcastVariableManager, - taskManagerActions, - inputSplitProvider, - checkpointResponder, - libraryCache, - fileCache, - taskManagerConfiguration, - taskMetricGroup, - resultPartitionConsumableNotifier, - partitionStateChecker, - getRpcService().getExecutor()); + jobInformation, + taskInformation, + tdd.getExecutionAttemptId(), + tdd.getAllocationId(), + tdd.getSubtaskIndex(), + tdd.getAttemptNumber(), + tdd.getProducedPartitions(), + tdd.getInputGates(), + tdd.getTargetSlotNumber(), + tdd.getTaskStateHandles(), + memoryManager, + ioManager, + networkEnvironment, + broadcastVariableManager, + taskManagerActions, + inputSplitProvider, + checkpointResponder, + libraryCache, + fileCache, + taskManagerConfiguration, + taskMetricGroup, + resultPartitionConsumableNotifier, + partitionStateChecker, + getRpcService().getExecutor()); log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); @@ -561,7 +561,6 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { // ---------------------------------------------------------------------- /** - * /** * Requests a slot from the TaskManager * * @param slotId identifying the requested slot @@ -573,12 +572,14 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { * @return answer to the slot request */ @RpcMethod - public TMSlotRequestReply requestSlot( + public Acknowledge requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final String targetAddress, final UUID rmLeaderId) throws SlotAllocationException { + // TODO: Filter invalid requests from the resource manager by using the instance/registration Id + log.info("Receive slot request {} for job {} from resource manager with leader id {}.", allocationId, jobId, rmLeaderId); @@ -608,7 +609,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { final String message = "The slot " + slotId + " has already been allocated for a different job."; log.info(message); - throw new SlotAllocationException(message); + + throw new SlotOccupiedException(message, taskSlotTable.getCurrentAllocation(slotId.getSlotNumber())); } if (jobManagerTable.contains(jobId)) { @@ -635,7 +637,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { } } - return new TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), getResourceID(), allocationId); + return Acknowledge.get(); } // ---------------------------------------------------------------------- @@ -1046,7 +1048,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { resourceManagerGateway.notifySlotAvailable( resourceManagerConnection.getTargetLeaderId(), resourceManagerConnection.getRegistrationId(), - new SlotID(getResourceID(), freedSlotIndex)); + new SlotID(getResourceID(), freedSlotIndex), + allocationId); } } catch (SlotNotFoundException e) { log.debug("Could not free slot for allocation id {}.", allocationId, e); @@ -1079,7 +1082,7 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> { private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) { JobManagerConnection jmConnection = jobManagerTable.get(jobId); - return jmConnection != null && jmConnection.getLeaderId().equals(leaderId); + return jmConnection != null && Objects.equals(jmConnection.getLeaderId(), leaderId); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 2bbf0e6..bedf8ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; @@ -51,7 +50,7 @@ public interface TaskExecutorGateway extends RpcGateway { * @throws SlotAllocationException if the slot allocation fails * @return answer to the slot request */ - Future<TMSlotRequestReply> requestSlot( + Future<Acknowledge> requestSlot( SlotID slotId, JobID jobId, AllocationID allocationId, http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java new file mode 100644 index 0000000..93e67a8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor.exceptions; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.util.Preconditions; + +public class SlotOccupiedException extends SlotAllocationException { + private static final long serialVersionUID = -3986333914244338888L; + + private final AllocationID allocationId; + + public SlotOccupiedException(String message, AllocationID allocationId) { + super(message); + this.allocationId = Preconditions.checkNotNull(allocationId); + } + + public SlotOccupiedException(String message, Throwable cause, AllocationID allocationId) { + super(message, cause); + this.allocationId = Preconditions.checkNotNull(allocationId); + } + + public SlotOccupiedException(Throwable cause, AllocationID allocationId) { + super(cause); + this.allocationId = Preconditions.checkNotNull(allocationId); + } + + public AllocationID getAllocationId() { + return allocationId; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java index 081d8f8..5c51c7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java @@ -509,6 +509,16 @@ public class TaskSlotTable implements TimeoutListener<AllocationID> { return new TaskIterator(jobId); } + /** + * Get the current allocation for the task slot with the given index. + * + * @param index identifying the slot for which the allocation id shall be retrieved + * @return Allocation id of the specified slot if allocated; otherwise null + */ + public AllocationID getCurrentAllocation(int index) { + return taskSlots.get(index).getAllocationId(); + } + // --------------------------------------------------------------------- // TimeoutListener methods // --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index e4e20b9..41c2e16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -49,7 +49,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; -import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -391,10 +391,14 @@ public class ResourceManagerTest extends TestLogger { final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor); - final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); final MetricRegistry metricRegistry = mock(MetricRegistry.class); final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class); final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); + final SlotManager slotManager = new SlotManager( + rpcService.getScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime()); try { final StandaloneResourceManager resourceManager = new StandaloneResourceManager( @@ -404,7 +408,7 @@ public class ResourceManagerTest extends TestLogger { resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, - slotManagerFactory, + slotManager, metricRegistry, jobLeaderIdService, testingFatalErrorHandler); http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index 89fd22f..b444640 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -80,7 +80,7 @@ public class SlotPoolRpcTest { Time.days(1), Time.days(1), Time.milliseconds(100) // this is the timeout for the request tested here ); - pool.start(UUID.randomUUID()); + pool.start(UUID.randomUUID(), "foobar"); Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null); http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index b4149b2..cf95461 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -78,7 +78,9 @@ public class SlotPoolTest extends TestLogger { mainThreadValidatorUtil.enterMainThread(); - slotPool.start(UUID.randomUUID()); + final String jobManagerAddress = "foobar"; + + slotPool.start(UUID.randomUUID(), jobManagerAddress); this.resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 49bc570..986f848 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -25,9 +25,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -57,13 +58,17 @@ public class ResourceManagerHATest extends TestLogger { Time.seconds(5L), Time.seconds(5L)); - ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L)); + ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration( + Time.seconds(5L), + new SlotManagerConfiguration( + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime())); ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( resourceManagerRuntimeServicesConfiguration, highAvailabilityServices, rpcService.getScheduledExecutor()); - SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); @@ -76,7 +81,7 @@ public class ResourceManagerHATest extends TestLogger { resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, - slotManagerFactory, + resourceManagerRuntimeServices.getSlotManager(), metricRegistry, resourceManagerRuntimeServices.getJobLeaderIdService(), testingFatalErrorHandler); http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 6a151ac..4836f74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -31,10 +31,11 @@ import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -240,12 +241,17 @@ public class ResourceManagerJobMasterTest extends TestLogger { ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), Time.seconds(5L)); - SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); MetricRegistry metricRegistry = mock(MetricRegistry.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, rpcService.getScheduledExecutor(), Time.minutes(5L)); + + final SlotManager slotManager = new SlotManager( + rpcService.getScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime()); ResourceManager resourceManager = new StandaloneResourceManager( rpcService, @@ -254,7 +260,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, - slotManagerFactory, + slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);