[FLINK-4339] [cluster management] Implement Slot Pool core on JobManager side
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aca811d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aca811d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aca811d Branch: refs/heads/flip-6 Commit: 7aca811df96ee0628fc4d274971b3ffc6d4b6eb7 Parents: 0615b62 Author: Kurt Young <ykt...@gmail.com> Authored: Thu Oct 13 04:59:46 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:32:27 2016 +0200 ---------------------------------------------------------------------- .../runtime/clusterframework/types/SlotID.java | 16 +- .../flink/runtime/instance/SlotDescriptor.java | 161 +++++ .../apache/flink/runtime/instance/SlotPool.java | 675 +++++++++++++++++++ .../apache/flink/runtime/akka/AkkaUtils.scala | 4 +- .../runtime/instance/AllocatedSlotsTest.java | 135 ++++ .../runtime/instance/AvailableSlotsTest.java | 123 ++++ .../flink/runtime/instance/SlotPoolTest.java | 297 ++++++++ 7 files changed, 1403 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java index e831a5d..237597b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java @@ -33,11 +33,11 @@ public class SlotID implements ResourceIDRetrievable, Serializable { private final ResourceID resourceId; /** The numeric id for single slot */ - private final int slotId; + private final int slotNumber; - public SlotID(ResourceID resourceId, int slotId) { + public SlotID(ResourceID resourceId, int slotNumber) { this.resourceId = checkNotNull(resourceId, "ResourceID must not be null"); - this.slotId = slotId; + this.slotNumber = slotNumber; } // ------------------------------------------------------------------------ @@ -47,6 +47,10 @@ public class SlotID implements ResourceIDRetrievable, Serializable { return resourceId; } + public int getSlotNumber() { + return slotNumber; + } + // ------------------------------------------------------------------------ @Override @@ -60,7 +64,7 @@ public class SlotID implements ResourceIDRetrievable, Serializable { SlotID slotID = (SlotID) o; - if (slotId != slotID.slotId) { + if (slotNumber != slotID.slotNumber) { return false; } return resourceId.equals(slotID.resourceId); @@ -69,13 +73,13 @@ public class SlotID implements ResourceIDRetrievable, Serializable { @Override public int hashCode() { int result = resourceId.hashCode(); - result = 31 * result + slotId; + result = 31 * result + slotNumber; return result; } @Override public String toString() { - return resourceId + "_" + slotId; + return resourceId + "_" + slotNumber; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java new file mode 100644 index 0000000..be7cf96 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The description of slots, TaskManagers offer one or more task slots, which define a slice of + * their resources. This description will contain some static information about the slot, such + * as the location and numeric id of the slot, rpc gateway to communicate with the TaskManager which + * owns the slot. + */ +public class SlotDescriptor { + + /** The ID of the job this slice belongs to. */ + private final JobID jobID; + + /** The location information of the TaskManager to which this slot belongs */ + private final TaskManagerLocation taskManagerLocation; + + /** The number of the slot on which the task is deployed */ + private final int slotNumber; + + /** The resource profile of the slot provides */ + private final ResourceProfile resourceProfile; + + /** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */ + private final ActorGateway taskManagerActorGateway; + + public SlotDescriptor( + final JobID jobID, + final TaskManagerLocation location, + final int slotNumber, + final ResourceProfile resourceProfile, + final ActorGateway actorGateway) + { + this.jobID = checkNotNull(jobID); + this.taskManagerLocation = checkNotNull(location); + this.slotNumber = slotNumber; + this.resourceProfile = checkNotNull(resourceProfile); + this.taskManagerActorGateway = checkNotNull(actorGateway); + } + + public SlotDescriptor(final SlotDescriptor other) { + this.jobID = other.jobID; + this.taskManagerLocation = other.taskManagerLocation; + this.slotNumber = other.slotNumber; + this.resourceProfile = other.resourceProfile; + this.taskManagerActorGateway = other.taskManagerActorGateway; + } + + // TODO - temporary workaround until we have the SlotDesriptor in the Slot + public SlotDescriptor(final Slot slot) { + this.jobID = slot.getJobID(); + this.taskManagerLocation = slot.getTaskManagerLocation(); + this.slotNumber = slot.getRootSlotNumber(); + this.resourceProfile = new ResourceProfile(0, 0); + this.taskManagerActorGateway = slot.getTaskManagerActorGateway(); + } + + /** + * Returns the ID of the job this allocated slot belongs to. + * + * @return the ID of the job this allocated slot belongs to + */ + public JobID getJobID() { + return jobID; + } + + /** + * Gets the number of the slot. + * + * @return The number of the slot on the TaskManager. + */ + public int getSlotNumber() { + return slotNumber; + } + + /** + * Gets the resource profile of the slot. + * + * @return The resource profile of the slot. + */ + public ResourceProfile getResourceProfile() { + return resourceProfile; + } + + /** + * Gets the location info of the TaskManager that offers this slot. + * + * @return The location info of the TaskManager that offers this slot + */ + public TaskManagerLocation getTaskManagerLocation() { + return taskManagerLocation; + } + + /** + * Gets the actor gateway that can be used to send messages to the TaskManager. + * <p> + * This method should be removed once the new interface-based RPC abstraction is in place + * + * @return The actor gateway that can be used to send messages to the TaskManager. + */ + public ActorGateway getTaskManagerActorGateway() { + return taskManagerActorGateway; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SlotDescriptor that = (SlotDescriptor) o; + + if (slotNumber != that.slotNumber) { + return false; + } + if (!jobID.equals(that.jobID)) { + return false; + } + return taskManagerLocation.equals(that.taskManagerLocation); + + } + + @Override + public int hashCode() { + int result = jobID.hashCode(); + result = 31 * result + taskManagerLocation.hashCode(); + result = 31 * result + slotNumber; + return result; + } + + @Override + public String toString() { + return taskManagerLocation + " - " + slotNumber; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java new file mode 100644 index 0000000..e7857c1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -0,0 +1,675 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ApplyFunction; +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected; +import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The slot pool serves slot request issued by Scheduler or ExecutionGraph. It will will attempt to acquire new slots + * from the ResourceManager when it cannot serve a slot request. If no ResourceManager is currently available, + * or it gets a decline from the ResourceManager, or a request times out, it fails the slot request. The slot pool also + * holds all the slots that were offered to it and accepted, and can thus provides registered free slots even if the + * ResourceManager is down. The slots will only be released when they are useless, e.g. when the job is fully running + * but we still have some free slots. + * <p> + * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to + * eliminate ambiguities. + */ +public class SlotPool implements SlotOwner { + + private static final Logger LOG = LoggerFactory.getLogger(SlotPool.class); + + private final Object lock = new Object(); + + /** The executor which is used to execute futures */ + private final Executor executor; + + /** All registered resources, slots will be accepted and used only if the resource is registered */ + private final Set<ResourceID> registeredResources; + + /** The book-keeping of all allocated slots */ + private final AllocatedSlots allocatedSlots; + + /** The book-keeping of all available slots */ + private final AvailableSlots availableSlots; + + /** All pending requests waiting for slots */ + private final Map<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> pendingRequests; + + /** Timeout of slot allocation */ + private final Time timeout; + + /** the leader id of job manager */ + private UUID jobManagerLeaderId; + + /** The leader id of resource manager */ + private UUID resourceManagerLeaderId; + + /** The gateway to communicate with resource manager */ + private ResourceManagerGateway resourceManagerGateway; + + public SlotPool(final Executor executor) { + this.executor = executor; + this.registeredResources = new HashSet<>(); + this.allocatedSlots = new AllocatedSlots(); + this.availableSlots = new AvailableSlots(); + this.pendingRequests = new HashMap<>(); + this.timeout = Time.of(5, TimeUnit.SECONDS); + } + + public void setJobManagerLeaderId(final UUID jobManagerLeaderId) { + this.jobManagerLeaderId = jobManagerLeaderId; + } + + // ------------------------------------------------------------------------ + // Slot Allocation + // ------------------------------------------------------------------------ + + /** + * Try to allocate a simple slot with specified resource profile. + * + * @param jobID The job id which the slot allocated for + * @param resourceProfile The needed resource profile + * @return The future of allocated simple slot + */ + public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final ResourceProfile resourceProfile) { + return allocateSimpleSlot(jobID, resourceProfile, new AllocationID()); + } + + + /** + * Try to allocate a simple slot with specified resource profile and specified allocation id. It's mainly + * for testing purpose since we need to specify whatever allocation id we want. + */ + @VisibleForTesting + Future<SimpleSlot> allocateSimpleSlot( + final JobID jobID, + final ResourceProfile resourceProfile, + final AllocationID allocationID) + { + final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>(); + + internalAllocateSlot(jobID, allocationID, resourceProfile, future); + + final SlotOwner owner = this; + return future.thenApplyAsync( + new ApplyFunction<SlotDescriptor, SimpleSlot>() { + @Override + public SimpleSlot apply(SlotDescriptor descriptor) { + SimpleSlot slot = new SimpleSlot( + descriptor.getJobID(), SlotPool.this, + descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(), + descriptor.getTaskManagerActorGateway()); + synchronized (lock) { + // double validation since we are out of the lock protection after the slot is granted + if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) { + LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, slot, jobID); + allocatedSlots.add(allocationID, descriptor, slot); + } + else { + throw new RuntimeException("Resource was marked dead asynchronously."); + } + } + return slot; + } + }, + executor + ); + } + + + /** + * Try to allocate a shared slot with specified resource profile. + * + * @param jobID The job id which the slot allocated for + * @param resourceProfile The needed resource profile + * @param sharingGroupAssignment The slot sharing group of the vertex + * @return The future of allocated shared slot + */ + public Future<SharedSlot> allocateSharedSlot( + final JobID jobID, + final ResourceProfile resourceProfile, + final SlotSharingGroupAssignment sharingGroupAssignment) + { + return allocateSharedSlot(jobID, resourceProfile, sharingGroupAssignment, new AllocationID()); + } + + /** + * Try to allocate a shared slot with specified resource profile and specified allocation id. It's mainly + * for testing purpose since we need to specify whatever allocation id we want. + */ + @VisibleForTesting + Future<SharedSlot> allocateSharedSlot( + final JobID jobID, + final ResourceProfile resourceProfile, + final SlotSharingGroupAssignment sharingGroupAssignment, + final AllocationID allocationID) + { + final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>(); + + internalAllocateSlot(jobID, allocationID, resourceProfile, future); + + return future.thenApplyAsync( + new ApplyFunction<SlotDescriptor, SharedSlot>() { + @Override + public SharedSlot apply(SlotDescriptor descriptor) { + SharedSlot slot = new SharedSlot( + descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(), + descriptor.getSlotNumber(), descriptor.getTaskManagerActorGateway(), + sharingGroupAssignment); + + synchronized (lock) { + // double validation since we are out of the lock protection after the slot is granted + if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) { + LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, slot, jobID); + allocatedSlots.add(allocationID, descriptor, slot); + } + else { + throw new RuntimeException("Resource was marked dead asynchronously."); + } + } + return slot; + } + }, + executor + ); + } + + /** + * Internally allocate the slot with specified resource profile. We will first check whether we have some + * free slot which can meet the requirement already and allocate it immediately. Otherwise, we will try to + * allocation the slot from resource manager. + */ + private void internalAllocateSlot( + final JobID jobID, + final AllocationID allocationID, + final ResourceProfile resourceProfile, + final FlinkCompletableFuture<SlotDescriptor> future) + { + LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", allocationID, resourceProfile, jobID); + + synchronized (lock) { + // check whether we have any free slot which can match the required resource profile + SlotDescriptor freeSlot = availableSlots.poll(resourceProfile); + if (freeSlot != null) { + future.complete(freeSlot); + } + else { + if (resourceManagerGateway != null) { + LOG.info("Allocation[{}] No available slot exists, trying to allocate from resource manager.", + allocationID); + SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); + pendingRequests.put(allocationID, new Tuple2<>(slotRequest, future)); + resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, slotRequest, timeout) + .handleAsync(new BiFunction<RMSlotRequestReply, Throwable, Void>() { + @Override + public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) { + if (throwable != null) { + future.completeExceptionally( + new Exception("Slot allocation from resource manager failed", throwable)); + } else if (slotRequestReply instanceof RMSlotRequestRejected) { + future.completeExceptionally( + new Exception("Slot allocation rejected by resource manager")); + } + return null; + } + }, executor); + } + else { + LOG.warn("Allocation[{}] Resource manager not available right now.", allocationID); + future.completeExceptionally(new Exception("Resource manager not available right now.")); + } + } + } + } + + // ------------------------------------------------------------------------ + // Slot De-allocation + // ------------------------------------------------------------------------ + + /** + * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the + * slot can be reused by other pending requests if the resource profile matches.n + * + * @param slot The slot needs to be returned + * @return True if the returning slot been accepted + */ + @Override + public boolean returnAllocatedSlot(Slot slot) { + checkNotNull(slot); + checkArgument(!slot.isAlive(), "slot is still alive"); + checkArgument(slot.getOwner() == this, "slot belongs to the wrong pool."); + + if (slot.markReleased()) { + synchronized (lock) { + final SlotDescriptor slotDescriptor = allocatedSlots.remove(slot); + if (slotDescriptor != null) { + // check if this TaskManager is valid + if (!registeredResources.contains(slot.getTaskManagerID())) { + return false; + } + + final FlinkCompletableFuture<SlotDescriptor> pendingRequest = pollPendingRequest(slotDescriptor); + if (pendingRequest != null) { + pendingRequest.complete(slotDescriptor); + } + else { + availableSlots.add(slotDescriptor); + } + + return true; + } + else { + throw new IllegalArgumentException("Slot was not allocated from this pool."); + } + } + } + else { + return false; + } + } + + private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDescriptor slotDescriptor) { + for (Map.Entry<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) { + final Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue(); + if (slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile())) { + pendingRequests.remove(entry.getKey()); + return pendingRequest.f1; + } + } + return null; + } + + // ------------------------------------------------------------------------ + // Slot Releasing + // ------------------------------------------------------------------------ + + /** + * Release slot to TaskManager, called for finished tasks or canceled jobs. + * + * @param slot The slot needs to be released. + */ + public void releaseSlot(final Slot slot) { + synchronized (lock) { + allocatedSlots.remove(slot); + availableSlots.remove(new SlotDescriptor(slot)); + // TODO: send release request to task manager + } + } + + // ------------------------------------------------------------------------ + // Slot Offering + // ------------------------------------------------------------------------ + + /** + * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and + * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation + * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending + * request waiting for this slot (maybe fulfilled by some other returned slot). + * + * @param allocationID The allocation id of the lo + * @param slotDescriptor The offered slot descriptor + * @return True if we accept the offering + */ + public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor slotDescriptor) { + synchronized (lock) { + // check if this TaskManager is valid + final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID(); + if (!registeredResources.contains(resourceID)) { + LOG.warn("Allocation[{}] Slot offering from unregistered TaskManager: {}", + allocationID, slotDescriptor); + return false; + } + + // check whether we have already using this slot + final Slot allocatedSlot = allocatedSlots.get(allocationID); + if (allocatedSlot != null) { + final SlotDescriptor allocatedSlotDescriptor = new SlotDescriptor(allocatedSlot); + + if (allocatedSlotDescriptor.equals(slotDescriptor)) { + LOG.debug("Allocation[{}] Duplicated slot offering: {}", + allocationID, slotDescriptor); + return true; + } + else { + LOG.info("Allocation[{}] Allocation had been fulfilled by slot {}, rejecting offered slot {}", + allocationID, allocatedSlotDescriptor, slotDescriptor); + return false; + } + } + + // check whether we already have this slot in free pool + if (availableSlots.contains(slotDescriptor)) { + LOG.debug("Allocation[{}] Duplicated slot offering: {}", + allocationID, slotDescriptor); + return true; + } + + // check whether we have request waiting for this slot + if (pendingRequests.containsKey(allocationID)) { + FlinkCompletableFuture<SlotDescriptor> future = pendingRequests.remove(allocationID).f1; + future.complete(slotDescriptor); + return true; + } + + // unwanted slot, rejecting this offer + return false; + } + } + + // ------------------------------------------------------------------------ + // Resource + // ------------------------------------------------------------------------ + + /** + * Register TaskManager to this pool, only those slots come from registered TaskManager will be considered valid. + * Also it provides a way for us to keep "dead" or "abnormal" TaskManagers out of this pool. + * + * @param resourceID The id of the TaskManager + */ + public void registerResource(final ResourceID resourceID) { + synchronized (lock) { + registeredResources.add(resourceID); + } + } + + /** + * Unregister TaskManager from this pool, all the related slots will be released and tasks be canceled. Called + * when we find some TaskManager becomes "dead" or "abnormal", and we decide to not using slots from it anymore. + * + * @param resourceID The id of the TaskManager + */ + public void releaseResource(final ResourceID resourceID) { + synchronized (lock) { + registeredResources.remove(resourceID); + availableSlots.removeByResource(resourceID); + + final Set<Slot> allocatedSlotsForResource = allocatedSlots.getSlotsByResource(resourceID); + for (Slot slot : allocatedSlotsForResource) { + slot.releaseSlot(); + } + } + } + + // ------------------------------------------------------------------------ + // ResourceManager + // ------------------------------------------------------------------------ + + public void setResourceManager( + final UUID resourceManagerLeaderId, + final ResourceManagerGateway resourceManagerGateway) + { + synchronized (lock) { + this.resourceManagerLeaderId = resourceManagerLeaderId; + this.resourceManagerGateway = resourceManagerGateway; + } + } + + public void disconnectResourceManager() { + synchronized (lock) { + this.resourceManagerLeaderId = null; + this.resourceManagerGateway = null; + } + } + + // ------------------------------------------------------------------------ + // Helper classes + // ------------------------------------------------------------------------ + + /** + * Organize allocated slots from different points of view. + */ + static class AllocatedSlots { + + /** All allocated slots organized by TaskManager */ + private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource; + + /** All allocated slots organized by Slot object */ + private final Map<Slot, AllocationID> allocatedSlots; + + private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor; + + /** All allocated slots organized by AllocationID */ + private final Map<AllocationID, Slot> allocatedSlotsById; + + AllocatedSlots() { + this.allocatedSlotsByResource = new HashMap<>(); + this.allocatedSlots = new HashMap<>(); + this.allocatedSlotsWithDescriptor = new HashMap<>(); + this.allocatedSlotsById = new HashMap<>(); + } + + /** + * Add a new allocation + * + * @param allocationID The allocation id + * @param slot The allocated slot + */ + void add(final AllocationID allocationID, final SlotDescriptor descriptor, final Slot slot) { + allocatedSlots.put(slot, allocationID); + allocatedSlotsById.put(allocationID, slot); + allocatedSlotsWithDescriptor.put(slot, descriptor); + + final ResourceID resourceID = slot.getTaskManagerID(); + Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID); + if (slotsForResource == null) { + slotsForResource = new HashSet<>(); + allocatedSlotsByResource.put(resourceID, slotsForResource); + } + slotsForResource.add(slot); + } + + /** + * Get allocated slot with allocation id + * + * @param allocationID The allocation id + * @return The allocated slot, null if we can't find a match + */ + Slot get(final AllocationID allocationID) { + return allocatedSlotsById.get(allocationID); + } + + /** + * Check whether we have allocated this slot + * + * @param slot The slot needs to checked + * @return True if we contains this slot + */ + boolean contains(final Slot slot) { + return allocatedSlots.containsKey(slot); + } + + /** + * Remove an allocation with slot. + * + * @param slot The slot needs to be removed + */ + SlotDescriptor remove(final Slot slot) { + final SlotDescriptor descriptor = allocatedSlotsWithDescriptor.remove(slot); + if (descriptor != null) { + final AllocationID allocationID = allocatedSlots.remove(slot); + if (allocationID != null) { + allocatedSlotsById.remove(allocationID); + } else { + throw new IllegalStateException("Bug: maps are inconsistent"); + } + + final ResourceID resourceID = slot.getTaskManagerID(); + final Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID); + slotsForResource.remove(slot); + if (slotsForResource.isEmpty()) { + allocatedSlotsByResource.remove(resourceID); + } + + return descriptor; + } else { + return null; + } + } + + /** + * Get all allocated slot from same TaskManager. + * + * @param resourceID The id of the TaskManager + * @return Set of slots which are allocated from the same TaskManager + */ + Set<Slot> getSlotsByResource(final ResourceID resourceID) { + Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID); + if (slotsForResource != null) { + return new HashSet<>(slotsForResource); + } + else { + return new HashSet<>(); + } + } + + @VisibleForTesting + boolean containResource(final ResourceID resourceID) { + return allocatedSlotsByResource.containsKey(resourceID); + } + + @VisibleForTesting + int size() { + return allocatedSlots.size(); + } + } + + /** + * Organize all available slots from different points of view. + */ + static class AvailableSlots { + + /** All available slots organized by TaskManager */ + private final Map<ResourceID, Set<SlotDescriptor>> availableSlotsByResource; + + /** All available slots */ + private final Set<SlotDescriptor> availableSlots; + + AvailableSlots() { + this.availableSlotsByResource = new HashMap<>(); + this.availableSlots = new HashSet<>(); + } + + /** + * Add an available slot. + * + * @param descriptor The descriptor of the slot + */ + void add(final SlotDescriptor descriptor) { + availableSlots.add(descriptor); + + final ResourceID resourceID = descriptor.getTaskManagerLocation().getResourceID(); + Set<SlotDescriptor> slotsForResource = availableSlotsByResource.get(resourceID); + if (slotsForResource == null) { + slotsForResource = new HashSet<>(); + availableSlotsByResource.put(resourceID, slotsForResource); + } + slotsForResource.add(descriptor); + } + + /** + * Check whether we have this slot + * + * @param slotDescriptor The descriptor of the slot + * @return True if we contains this slot + */ + boolean contains(final SlotDescriptor slotDescriptor) { + return availableSlots.contains(slotDescriptor); + } + + /** + * Poll a slot which matches the required resource profile + * + * @param resourceProfile The required resource profile + * @return Slot which matches the resource profile, null if we can't find a match + */ + SlotDescriptor poll(final ResourceProfile resourceProfile) { + for (SlotDescriptor slotDescriptor : availableSlots) { + if (slotDescriptor.getResourceProfile().isMatching(resourceProfile)) { + remove(slotDescriptor); + return slotDescriptor; + } + } + return null; + } + + /** + * Remove all available slots come from specified TaskManager. + * + * @param resourceID The id of the TaskManager + */ + void removeByResource(final ResourceID resourceID) { + final Set<SlotDescriptor> slotsForResource = availableSlotsByResource.remove(resourceID); + if (slotsForResource != null) { + for (SlotDescriptor slotDescriptor : slotsForResource) { + availableSlots.remove(slotDescriptor); + } + } + } + + private void remove(final SlotDescriptor slotDescriptor) { + availableSlots.remove(slotDescriptor); + + final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID(); + final Set<SlotDescriptor> slotsForResource = checkNotNull(availableSlotsByResource.get(resourceID)); + slotsForResource.remove(slotDescriptor); + if (slotsForResource.isEmpty()) { + availableSlotsByResource.remove(resourceID); + } + } + + @VisibleForTesting + boolean containResource(final ResourceID resourceID) { + return availableSlotsByResource.containsKey(resourceID); + } + + @VisibleForTesting + int size() { + return availableSlots.size(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 9209d15..2461340 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -583,8 +583,8 @@ object AkkaUtils { } def formatDurationParingErrorMessage: String = { - "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " + - "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|"+ + "Duration format must be \"val unit\", where 'val' is a number and 'unit' is " + + "(d|day)|(h|hour)|(min|minute)|s|sec|second)|(ms|milli|millisecond)|" + "(µs|micro|microsecond)|(ns|nano|nanosecond)" } } http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java new file mode 100644 index 0000000..655a3ea --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AllocatedSlotsTest { + + @Test + public void testOperations() throws Exception { + SlotPool.AllocatedSlots allocatedSlots = new SlotPool.AllocatedSlots(); + + final AllocationID allocation1 = new AllocationID(); + final ResourceID resource1 = new ResourceID("resource1"); + final Slot slot1 = createSlot(resource1); + + allocatedSlots.add(allocation1, new SlotDescriptor(slot1), slot1); + + assertTrue(allocatedSlots.contains(slot1)); + assertTrue(allocatedSlots.containResource(resource1)); + + assertEquals(slot1, allocatedSlots.get(allocation1)); + assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size()); + assertEquals(1, allocatedSlots.size()); + + final AllocationID allocation2 = new AllocationID(); + final Slot slot2 = createSlot(resource1); + + allocatedSlots.add(allocation2, new SlotDescriptor(slot2), slot2); + + assertTrue(allocatedSlots.contains(slot1)); + assertTrue(allocatedSlots.contains(slot2)); + assertTrue(allocatedSlots.containResource(resource1)); + + assertEquals(slot1, allocatedSlots.get(allocation1)); + assertEquals(slot2, allocatedSlots.get(allocation2)); + assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size()); + assertEquals(2, allocatedSlots.size()); + + final AllocationID allocation3 = new AllocationID(); + final ResourceID resource2 = new ResourceID("resource2"); + final Slot slot3 = createSlot(resource2); + + allocatedSlots.add(allocation3, new SlotDescriptor(slot2), slot3); + + assertTrue(allocatedSlots.contains(slot1)); + assertTrue(allocatedSlots.contains(slot2)); + assertTrue(allocatedSlots.contains(slot3)); + assertTrue(allocatedSlots.containResource(resource1)); + assertTrue(allocatedSlots.containResource(resource2)); + + assertEquals(slot1, allocatedSlots.get(allocation1)); + assertEquals(slot2, allocatedSlots.get(allocation2)); + assertEquals(slot3, allocatedSlots.get(allocation3)); + assertEquals(2, allocatedSlots.getSlotsByResource(resource1).size()); + assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size()); + assertEquals(3, allocatedSlots.size()); + + allocatedSlots.remove(slot2); + + assertTrue(allocatedSlots.contains(slot1)); + assertFalse(allocatedSlots.contains(slot2)); + assertTrue(allocatedSlots.contains(slot3)); + assertTrue(allocatedSlots.containResource(resource1)); + assertTrue(allocatedSlots.containResource(resource2)); + + assertEquals(slot1, allocatedSlots.get(allocation1)); + assertNull(allocatedSlots.get(allocation2)); + assertEquals(slot3, allocatedSlots.get(allocation3)); + assertEquals(1, allocatedSlots.getSlotsByResource(resource1).size()); + assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size()); + assertEquals(2, allocatedSlots.size()); + + allocatedSlots.remove(slot1); + + assertFalse(allocatedSlots.contains(slot1)); + assertFalse(allocatedSlots.contains(slot2)); + assertTrue(allocatedSlots.contains(slot3)); + assertFalse(allocatedSlots.containResource(resource1)); + assertTrue(allocatedSlots.containResource(resource2)); + + assertNull(allocatedSlots.get(allocation1)); + assertNull(allocatedSlots.get(allocation2)); + assertEquals(slot3, allocatedSlots.get(allocation3)); + assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size()); + assertEquals(1, allocatedSlots.getSlotsByResource(resource2).size()); + assertEquals(1, allocatedSlots.size()); + + allocatedSlots.remove(slot3); + + assertFalse(allocatedSlots.contains(slot1)); + assertFalse(allocatedSlots.contains(slot2)); + assertFalse(allocatedSlots.contains(slot3)); + assertFalse(allocatedSlots.containResource(resource1)); + assertFalse(allocatedSlots.containResource(resource2)); + + assertNull(allocatedSlots.get(allocation1)); + assertNull(allocatedSlots.get(allocation2)); + assertNull(allocatedSlots.get(allocation3)); + assertEquals(0, allocatedSlots.getSlotsByResource(resource1).size()); + assertEquals(0, allocatedSlots.getSlotsByResource(resource2).size()); + assertEquals(0, allocatedSlots.size()); + } + + private Slot createSlot(final ResourceID resourceId) { + Slot slot = mock(Slot.class); + when(slot.getTaskManagerID()).thenReturn(resourceId); + return slot; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java new file mode 100644 index 0000000..872810f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AvailableSlotsTest { + + static final ResourceProfile DEFAULT_TESTING_PROFILE = new ResourceProfile(1.0, 512); + + static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(2.0, 1024); + + @Test + public void testAddAndRemove() throws Exception { + SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots(); + + final ResourceID resource1 = new ResourceID("resource1"); + final ResourceID resource2 = new ResourceID("resource2"); + + final SlotDescriptor slot1 = createSlotDescriptor(resource1); + final SlotDescriptor slot2 = createSlotDescriptor(resource1); + final SlotDescriptor slot3 = createSlotDescriptor(resource2); + + availableSlots.add(slot1); + availableSlots.add(slot2); + availableSlots.add(slot3); + + assertEquals(3, availableSlots.size()); + assertTrue(availableSlots.contains(slot1)); + assertTrue(availableSlots.contains(slot2)); + assertTrue(availableSlots.contains(slot3)); + assertTrue(availableSlots.containResource(resource1)); + assertTrue(availableSlots.containResource(resource2)); + + availableSlots.removeByResource(resource1); + + assertEquals(1, availableSlots.size()); + assertFalse(availableSlots.contains(slot1)); + assertFalse(availableSlots.contains(slot2)); + assertTrue(availableSlots.contains(slot3)); + assertFalse(availableSlots.containResource(resource1)); + assertTrue(availableSlots.containResource(resource2)); + + availableSlots.removeByResource(resource2); + + assertEquals(0, availableSlots.size()); + assertFalse(availableSlots.contains(slot1)); + assertFalse(availableSlots.contains(slot2)); + assertFalse(availableSlots.contains(slot3)); + assertFalse(availableSlots.containResource(resource1)); + assertFalse(availableSlots.containResource(resource2)); + } + + @Test + public void testPollFreeSlot() { + SlotPool.AvailableSlots availableSlots = new SlotPool.AvailableSlots(); + + final ResourceID resource1 = new ResourceID("resource1"); + final SlotDescriptor slot1 = createSlotDescriptor(resource1); + + availableSlots.add(slot1); + + assertEquals(1, availableSlots.size()); + assertTrue(availableSlots.contains(slot1)); + assertTrue(availableSlots.containResource(resource1)); + + assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE)); + + assertEquals(slot1, availableSlots.poll(DEFAULT_TESTING_PROFILE)); + assertEquals(0, availableSlots.size()); + assertFalse(availableSlots.contains(slot1)); + assertFalse(availableSlots.containResource(resource1)); + } + + static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) { + return createSlotDescriptor(resourceID, new JobID()); + } + + static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID) { + return createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + } + + static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID, + final ResourceProfile resourceProfile) + { + return createSlotDescriptor(resourceID, jobID, resourceProfile, 0); + } + + static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, final JobID jobID, + final ResourceProfile resourceProfile, final int slotNumber) + { + TaskManagerLocation location = mock(TaskManagerLocation.class); + when(location.getResourceID()).thenReturn(resourceID); + return new SlotDescriptor(jobID, location, slotNumber, resourceProfile, mock(ActorGateway.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/7aca811d/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java new file mode 100644 index 0000000..30cdbd6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.BiFunction; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; +import static org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SlotPoolTest extends TestLogger { + + private Executor executor; + + private SlotPool slotPool; + + private ResourceManagerGateway resourceManagerGateway; + + @Before + public void setUp() throws Exception { + this.executor = Executors.newFixedThreadPool(1); + this.slotPool = new SlotPool(executor); + this.resourceManagerGateway = mock(ResourceManagerGateway.class); + when(resourceManagerGateway + .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class))) + .thenReturn(mock(Future.class)); + slotPool.setResourceManager(UUID.randomUUID(), resourceManagerGateway); + slotPool.setJobManagerLeaderId(UUID.randomUUID()); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testAllocateSimpleSlot() throws Exception { + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerResource(resourceID); + + JobID jobID = new JobID(); + AllocationID allocationID = new AllocationID(); + Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID); + assertFalse(future.isDone()); + verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)); + + SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + assertTrue(slotPool.offerSlot(allocationID, slotDescriptor)); + + SimpleSlot slot = future.get(1, TimeUnit.SECONDS); + assertTrue(future.isDone()); + assertTrue(slot.isAlive()); + assertEquals(resourceID, slot.getTaskManagerID()); + assertEquals(jobID, slot.getJobID()); + assertEquals(slotPool, slot.getOwner()); + } + + @Test + public void testAllocateSharedSlot() throws Exception { + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerResource(resourceID); + + JobVertexID vid = new JobVertexID(); + SlotSharingGroup sharingGroup = new SlotSharingGroup(vid); + SlotSharingGroupAssignment assignment = sharingGroup.getTaskAssignment(); + + JobID jobID = new JobID(); + AllocationID allocationID = new AllocationID(); + Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, DEFAULT_TESTING_PROFILE, assignment, allocationID); + + assertFalse(future.isDone()); + verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)); + + SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + assertTrue(slotPool.offerSlot(allocationID, slotDescriptor)); + + SharedSlot slot = future.get(1, TimeUnit.SECONDS); + assertTrue(future.isDone()); + assertTrue(slot.isAlive()); + assertEquals(resourceID, slot.getTaskManagerID()); + assertEquals(jobID, slot.getJobID()); + assertEquals(slotPool, slot.getOwner()); + + SimpleSlot simpleSlot = slot.allocateSubSlot(vid); + assertNotNull(simpleSlot); + assertTrue(simpleSlot.isAlive()); + } + + @Test + public void testAllocateSlotWithoutResourceManager() throws Exception { + slotPool.disconnectResourceManager(); + Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new JobID(), DEFAULT_TESTING_PROFILE); + future.handleAsync( + new BiFunction<SimpleSlot, Throwable, Void>() { + @Override + public Void apply(SimpleSlot simpleSlot, Throwable throwable) { + assertNull(simpleSlot); + assertNotNull(throwable); + return null; + } + }, + executor); + try { + future.get(1, TimeUnit.SECONDS); + fail("We expected a ExecutionException."); + } catch (ExecutionException ex) { + // we expect the exception + } + } + + @Test + public void testAllocationFulfilledByReturnedSlot() throws Exception { + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerResource(resourceID); + + JobID jobID = new JobID(); + + AllocationID allocationID1 = new AllocationID(); + Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1); + + AllocationID allocationID2 = new AllocationID(); + Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2); + + assertFalse(future1.isDone()); + assertFalse(future2.isDone()); + verify(resourceManagerGateway, times(2)) + .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)); + + SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor)); + + SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); + assertTrue(future1.isDone()); + assertFalse(future2.isDone()); + + // return this slot to pool + slot1.releaseSlot(); + + // second allocation fulfilled by previous slot returning + SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); + assertTrue(future2.isDone()); + + assertNotEquals(slot1, slot2); + assertTrue(slot1.isReleased()); + assertTrue(slot2.isAlive()); + assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID()); + assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber()); + } + + @Test + public void testAllocateWithFreeSlot() throws Exception { + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerResource(resourceID); + + JobID jobID = new JobID(); + AllocationID allocationID1 = new AllocationID(); + Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1); + assertFalse(future1.isDone()); + + SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor)); + + SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); + assertTrue(future1.isDone()); + + // return this slot to pool + slot1.releaseSlot(); + + AllocationID allocationID2 = new AllocationID(); + Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2); + + // second allocation fulfilled by previous slot returning + SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); + assertTrue(future2.isDone()); + + assertNotEquals(slot1, slot2); + assertTrue(slot1.isReleased()); + assertTrue(slot2.isAlive()); + assertEquals(slot1.getTaskManagerID(), slot2.getTaskManagerID()); + assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber()); + } + + @Test + public void testOfferSlot() throws Exception { + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerResource(resourceID); + + JobID jobID = new JobID(); + AllocationID allocationID = new AllocationID(); + Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID); + assertFalse(future.isDone()); + verify(resourceManagerGateway).requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class)); + + // slot from unregistered resource + SlotDescriptor invalid = createSlotDescriptor(new ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE); + assertFalse(slotPool.offerSlot(allocationID, invalid)); + + SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + + // reject offering with mismatch allocation id + assertFalse(slotPool.offerSlot(new AllocationID(), slotDescriptor)); + + // accepted slot + assertTrue(slotPool.offerSlot(allocationID, slotDescriptor)); + SimpleSlot slot = future.get(1, TimeUnit.SECONDS); + assertTrue(future.isDone()); + assertTrue(slot.isAlive()); + + // conflict offer with using slot + SlotDescriptor conflict = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + assertFalse(slotPool.offerSlot(allocationID, conflict)); + + // duplicated offer with using slot + assertTrue(slotPool.offerSlot(allocationID, slotDescriptor)); + assertTrue(future.isDone()); + assertTrue(slot.isAlive()); + + // duplicated offer with free slot + slot.releaseSlot(); + assertTrue(slot.isReleased()); + assertTrue(slotPool.offerSlot(allocationID, slotDescriptor)); + } + + @Test + public void testReleaseResource() throws Exception { + ResourceID resourceID = new ResourceID("resource"); + slotPool.registerResource(resourceID); + + JobID jobID = new JobID(); + + AllocationID allocationID1 = new AllocationID(); + Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID1); + + AllocationID allocationID2 = new AllocationID(); + Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, DEFAULT_TESTING_PROFILE, allocationID2); + + SlotDescriptor slotDescriptor = createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE); + assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor)); + + SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS); + assertTrue(future1.isDone()); + assertFalse(future2.isDone()); + + slotPool.releaseResource(resourceID); + assertTrue(slot1.isReleased()); + + // slot released and not usable, second allocation still not fulfilled + Thread.sleep(10); + assertFalse(future2.isDone()); + } + +}