Repository: flink
Updated Branches:
  refs/heads/flip-6 48c936eed -> 7aca811df (forced update)


[FLINK-4347][cluster management] Implement SlotManager core

This closes #2388


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e22c64d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e22c64d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e22c64d

Branch: refs/heads/flip-6
Commit: 6e22c64dc6f5c2c252df46a5a32adf20da7ef162
Parents: ce6114d
Author: Kurt Young <ykt...@gmail.com>
Authored: Thu Aug 18 15:48:30 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:39 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/SlotManager.java   | 525 ++++++++++++++++++
 .../clusterframework/types/ResourceID.java      |   4 +-
 .../clusterframework/types/ResourceSlot.java    |  66 +++
 .../runtime/clusterframework/types/SlotID.java  |  14 +-
 .../rpc/resourcemanager/SlotRequest.java        |  51 +-
 .../runtime/rpc/taskexecutor/SlotReport.java    |  56 ++
 .../runtime/rpc/taskexecutor/SlotStatus.java    | 129 +++++
 .../clusterframework/SlotManagerTest.java       | 540 +++++++++++++++++++
 8 files changed, 1377 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
new file mode 100644
index 0000000..cc140a1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/SlotManager.java
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotReport;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests 
in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with 
TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which 
is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool 
or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based 
on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
+
+       /** Gateway to communicate with ResourceManager */
+       private final ResourceManagerGateway resourceManagerGateway;
+
+       /** All registered slots, including free and allocated slots */
+       private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
+
+       /** All pending slot requests, waiting available slots to fulfil */
+       private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+       /** All free slots that can be used to be allocated */
+       private final Map<SlotID, ResourceSlot> freeSlots;
+
+       /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
+       private final AllocationMap allocationMap;
+
+       public SlotManager(ResourceManagerGateway resourceManagerGateway) {
+               this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
+               this.registeredSlots = new HashMap<>(16);
+               this.pendingSlotRequests = new LinkedHashMap<>(16);
+               this.freeSlots = new HashMap<>(16);
+               this.allocationMap = new AllocationMap();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  slot managements
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
+        * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
+        * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
+        * RPC's main thread to avoid race condition).
+        *
+        * @param request The detailed request of the slot
+        */
+       public void requestSlot(final SlotRequest request) {
+               if (isRequestDuplicated(request)) {
+                       LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
+                       return;
+               }
+
+               // try to fulfil the request with current free slots
+               ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+               if (slot != null) {
+                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
+                               request.getAllocationId(), request.getJobId());
+
+                       // record this allocation in bookkeeping
+                       allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
+
+                       // remove selected slot from free pool
+                       freeSlots.remove(slot.getSlotId());
+
+                       // TODO: send slot request to TaskManager
+               } else {
+                       LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
+                               "AllocationID:{}, JobID:{}", 
request.getAllocationId(), request.getJobId());
+                       allocateContainer(request.getResourceProfile());
+                       pendingSlotRequests.put(request.getAllocationId(), 
request);
+               }
+       }
+
+       /**
+        * Sync slot status with TaskManager's SlotReport.
+        */
+       public void updateSlotStatus(final SlotReport slotReport) {
+               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+                       updateSlotStatus(slotStatus);
+               }
+       }
+
+       /**
+        * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
+        * or really rejected by TaskManager. We shall retry this request by:
+        * <ul>
+        * <li>1. verify and clear all the previous allocate information for 
this request
+        * <li>2. try to request slot again
+        * </ul>
+        * <p>
+        * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
+        * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
+        * but it can be taken care of by rejecting registration at JobManager.
+        *
+        * @param originalRequest The original slot request
+        * @param slotId          The target SlotID
+        */
+       public void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
+               final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
+               LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
+                       slotId, originalAllocationId, 
originalRequest.getJobId());
+
+               // verify the allocation info before we do anything
+               if (freeSlots.containsKey(slotId)) {
+                       // this slot is currently empty, no need to de-allocate 
it from our allocations
+                       LOG.info("Original slot is somehow empty, retrying this 
request");
+
+                       // before retry, we should double check whether this 
request was allocated by some other ways
+                       if (!allocationMap.isAllocated(originalAllocationId)) {
+                               requestSlot(originalRequest);
+                       } else {
+                               LOG.info("The failed request has somehow been 
allocated, SlotID:{}",
+                                       
allocationMap.getSlotID(originalAllocationId));
+                       }
+               } else if (allocationMap.isAllocated(slotId)) {
+                       final AllocationID currentAllocationId = 
allocationMap.getAllocationID(slotId);
+
+                       // check whether we have an agreement on whom this slot 
belongs to
+                       if (originalAllocationId.equals(currentAllocationId)) {
+                               LOG.info("De-allocate this request and retry");
+                               
allocationMap.removeAllocation(currentAllocationId);
+
+                               // put this slot back to free pool
+                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
+                               freeSlots.put(slotId, slot);
+
+                               // retry the request
+                               requestSlot(originalRequest);
+                       } else {
+                               // the slot is taken by someone else, no need 
to de-allocate it from our allocations
+                               LOG.info("Original slot is taken by someone 
else, current AllocationID:{}", currentAllocationId);
+
+                               // before retry, we should double check whether 
this request was allocated by some other ways
+                               if 
(!allocationMap.isAllocated(originalAllocationId)) {
+                                       requestSlot(originalRequest);
+                               } else {
+                                       LOG.info("The failed request is somehow 
been allocated, SlotID:{}",
+                                               
allocationMap.getSlotID(originalAllocationId));
+                               }
+                       }
+               } else {
+                       LOG.error("BUG! {} is neither in free pool nor in 
allocated pool", slotId);
+               }
+       }
+
+       /**
+        * Callback for TaskManager failures. In case that a TaskManager fails, 
we have to clean up all its slots.
+        *
+        * @param resourceId The ResourceID of the TaskManager
+        */
+       public void notifyTaskManagerFailure(final ResourceID resourceId) {
+               LOG.info("Resource:{} been notified failure", resourceId);
+               final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
+               if (slotIdsToRemove != null) {
+                       for (SlotID slotId : slotIdsToRemove.keySet()) {
+                               LOG.info("Removing Slot:{} upon resource 
failure", slotId);
+                               if (freeSlots.containsKey(slotId)) {
+                                       freeSlots.remove(slotId);
+                               } else if (allocationMap.isAllocated(slotId)) {
+                                       allocationMap.removeAllocation(slotId);
+                               } else {
+                                       LOG.error("BUG! {} is neither in free 
pool nor in allocated pool", slotId);
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  internal behaviors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Update slot status based on TaskManager's report. There are mainly 
two situations when we receive the report:
+        * <ul>
+        * <li>1. The slot is newly registered.</li>
+        * <li>2. The slot has registered, it contains its current status.</li>
+        * </ul>
+        * <p>
+        * Regarding 1: It's fairly simple, we just record this slot's status, 
and trigger schedule if slot is empty.
+        * <p>
+        * Regarding 2: It will cause some weird situation since we may have 
some time-gap on how the slot's status really
+        * is. We may have some updates on the slot's allocation, but it 
doesn't reflected by TaskManager's heartbeat yet,
+        * and we may make some wrong decision if we cannot guarantee we have 
the exact status about all the slots. So
+        * the principle here is: We always trust TaskManager's heartbeat, we 
will correct our information based on that
+        * and take next action based on the diff between our information and 
heartbeat status.
+        *
+        * @param reportedStatus Reported slot status
+        */
+       void updateSlotStatus(final SlotStatus reportedStatus) {
+               final SlotID slotId = reportedStatus.getSlotID();
+               final ResourceSlot slot = new ResourceSlot(slotId, 
reportedStatus.getProfiler());
+
+               if (registerNewSlot(slot)) {
+                       // we have a newly registered slot
+                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+                       if (reportedStatus.getAllocationID() != null) {
+                               // slot in use, record this in bookkeeping
+                               allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
+                       } else {
+                               handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
+                       }
+               } else {
+                       // slot exists, update current information
+                       if (reportedStatus.getAllocationID() != null) {
+                               // slot is reported in use
+                               final AllocationID reportedAllocationId = 
reportedStatus.getAllocationID();
+
+                               // check whether we also thought this slot is 
in use
+                               if (allocationMap.isAllocated(slotId)) {
+                                       // we also think that slot is in use, 
check whether the AllocationID matches
+                                       final AllocationID currentAllocationId 
= allocationMap.getAllocationID(slotId);
+
+                                       if 
(!reportedAllocationId.equals(currentAllocationId)) {
+                                               LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:{}",
+                                                       slotId, 
currentAllocationId, reportedAllocationId);
+
+                                               // seems we have a disagreement 
about the slot assignments, need to correct it
+                                               
allocationMap.removeAllocation(slotId);
+                                               
allocationMap.addAllocation(slotId, reportedAllocationId);
+                                       }
+                               } else {
+                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:null, reported:{}",
+                                               slotId, reportedAllocationId);
+
+                                       // we thought the slot is free, should 
correct this information
+                                       allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
+
+                                       // remove this slot from free slots pool
+                                       freeSlots.remove(slotId);
+                               }
+                       } else {
+                               // slot is reported empty
+
+                               // check whether we also thought this slot is 
empty
+                               if (allocationMap.isAllocated(slotId)) {
+                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
+                                               slotId, 
allocationMap.getAllocationID(slotId));
+
+                                       // we thought the slot is in use, 
correct it
+                                       allocationMap.removeAllocation(slotId);
+
+                                       // we have a free slot!
+                                       handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
+                               }
+                       }
+               }
+       }
+
+       /**
+        * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
+        * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
+        * to the free pool.
+        *
+        * @param freeSlot The free slot
+        */
+       private void handleFreeSlot(final ResourceSlot freeSlot) {
+               SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, 
pendingSlotRequests);
+
+               if (chosenRequest != null) {
+                       
pendingSlotRequests.remove(chosenRequest.getAllocationId());
+
+                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
+                               chosenRequest.getAllocationId(), 
chosenRequest.getJobId());
+                       allocationMap.addAllocation(freeSlot.getSlotId(), 
chosenRequest.getAllocationId());
+
+                       // TODO: send slot request to TaskManager
+               } else {
+                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
+               }
+       }
+
+       /**
+        * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
+        * formerly received slot request, it is either in pending list or 
already been allocated.
+        *
+        * @param request The slot request
+        * @return <tt>true</tt> if the request is duplicated
+        */
+       private boolean isRequestDuplicated(final SlotRequest request) {
+               final AllocationID allocationId = request.getAllocationId();
+               return pendingSlotRequests.containsKey(allocationId)
+                       || allocationMap.isAllocated(allocationId);
+       }
+
+       /**
+        * Try to register slot, and tell if this slot is newly registered.
+        *
+        * @param slot The ResourceSlot which will be checked and registered
+        * @return <tt>true</tt> if we meet a new slot
+        */
+       private boolean registerNewSlot(final ResourceSlot slot) {
+               final SlotID slotId = slot.getSlotId();
+               final ResourceID resourceId = slotId.getResourceID();
+               if (!registeredSlots.containsKey(resourceId)) {
+                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+               }
+               return registeredSlots.get(resourceId).put(slotId, slot) == 
null;
+       }
+
+       private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+               final ResourceID resourceId = slotId.getResourceID();
+               if (!registeredSlots.containsKey(resourceId)) {
+                       return null;
+               }
+               return registeredSlots.get(resourceId).get(slotId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Framework specific behavior
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Choose a slot to use among all free slots, the behavior is framework 
specified.
+        *
+        * @param request   The slot request
+        * @param freeSlots All slots which can be used
+        * @return The slot we choose to use, <tt>null</tt> if we did not find 
a match
+        */
+       protected abstract ResourceSlot chooseSlotToUse(final SlotRequest 
request,
+               final Map<SlotID, ResourceSlot> freeSlots);
+
+       /**
+        * Choose a pending request to fulfill when we have a free slot, the 
behavior is framework specified.
+        *
+        * @param offeredSlot     The free slot
+        * @param pendingRequests All the pending slot requests
+        * @return The chosen SlotRequest, <tt>null</tt> if we did not find a 
match
+        */
+       protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
+               final Map<AllocationID, SlotRequest> pendingRequests);
+
+       /**
+        * The framework specific code for allocating a container for specified 
resource profile.
+        *
+        * @param resourceProfile The resource profile
+        */
+       protected abstract void allocateContainer(final ResourceProfile 
resourceProfile);
+
+
+       // 
------------------------------------------------------------------------
+       //  Helper classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * We maintain all the allocations with SlotID and AllocationID. We are 
able to get or remove the allocation info
+        * either by SlotID or AllocationID.
+        */
+       private static class AllocationMap {
+
+               /** All allocated slots (by SlotID) */
+               private final Map<SlotID, AllocationID> allocatedSlots;
+
+               /** All allocated slots (by AllocationID), it'a a inverse view 
of allocatedSlots */
+               private final Map<AllocationID, SlotID> 
allocatedSlotsByAllocationId;
+
+               AllocationMap() {
+                       this.allocatedSlots = new HashMap<>(16);
+                       this.allocatedSlotsByAllocationId = new HashMap<>(16);
+               }
+
+               /**
+                * Add a allocation
+                *
+                * @param slotId       The slot id
+                * @param allocationId The allocation id
+                */
+               void addAllocation(final SlotID slotId, final AllocationID 
allocationId) {
+                       allocatedSlots.put(slotId, allocationId);
+                       allocatedSlotsByAllocationId.put(allocationId, slotId);
+               }
+
+               /**
+                * De-allocation with slot id
+                *
+                * @param slotId The slot id
+                */
+               void removeAllocation(final SlotID slotId) {
+                       if (allocatedSlots.containsKey(slotId)) {
+                               final AllocationID allocationId = 
allocatedSlots.get(slotId);
+                               allocatedSlots.remove(slotId);
+                               
allocatedSlotsByAllocationId.remove(allocationId);
+                       }
+               }
+
+               /**
+                * De-allocation with allocation id
+                *
+                * @param allocationId The allocation id
+                */
+               void removeAllocation(final AllocationID allocationId) {
+                       if 
(allocatedSlotsByAllocationId.containsKey(allocationId)) {
+                               SlotID slotId = 
allocatedSlotsByAllocationId.get(allocationId);
+                               
allocatedSlotsByAllocationId.remove(allocationId);
+                               allocatedSlots.remove(slotId);
+                       }
+               }
+
+               /**
+                * Check whether allocation exists by slot id
+                *
+                * @param slotId The slot id
+                * @return true if the allocation exists
+                */
+               boolean isAllocated(final SlotID slotId) {
+                       return allocatedSlots.containsKey(slotId);
+               }
+
+               /**
+                * Check whether allocation exists by allocation id
+                *
+                * @param allocationId The allocation id
+                * @return true if the allocation exists
+                */
+               boolean isAllocated(final AllocationID allocationId) {
+                       return 
allocatedSlotsByAllocationId.containsKey(allocationId);
+               }
+
+               AllocationID getAllocationID(final SlotID slotId) {
+                       return allocatedSlots.get(slotId);
+               }
+
+               SlotID getSlotID(final AllocationID allocationId) {
+                       return allocatedSlotsByAllocationId.get(allocationId);
+               }
+
+               public int size() {
+                       return allocatedSlots.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Testing utilities
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       boolean isAllocated(final SlotID slotId) {
+               return allocationMap.isAllocated(slotId);
+       }
+
+       @VisibleForTesting
+       boolean isAllocated(final AllocationID allocationId) {
+               return allocationMap.isAllocated(allocationId);
+       }
+
+       /**
+        * Add free slots directly to the free pool, this will not trigger 
pending requests allocation
+        *
+        * @param slot The resource slot
+        */
+       @VisibleForTesting
+       void addFreeSlot(final ResourceSlot slot) {
+               final ResourceID resourceId = slot.getResourceID();
+               final SlotID slotId = slot.getSlotId();
+
+               if (!registeredSlots.containsKey(resourceId)) {
+                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+               }
+               registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+               freeSlots.put(slotId, slot);
+       }
+
+       @VisibleForTesting
+       int getAllocatedSlotCount() {
+               return allocationMap.size();
+       }
+
+       @VisibleForTesting
+       int getFreeSlotCount() {
+               return freeSlots.size();
+       }
+
+       @VisibleForTesting
+       int getPendingRequestCount() {
+               return pendingSlotRequests.size();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index 8cf9ccb..6b8a037 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -63,9 +63,7 @@ public final class ResourceID implements 
ResourceIDRetrievable, Serializable {
 
        @Override
        public String toString() {
-               return "ResourceID{" +
-                       "resourceId='" + resourceId + '\'' +
-                       '}';
+               return resourceId;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
new file mode 100644
index 0000000..8a6db5f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework.types;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A ResourceSlot represents a slot located in TaskManager from 
ResourceManager's view. It has a unique
+ * identification and resource profile which we can compare to the resource 
request.
+ */
+public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+
+       private static final long serialVersionUID = -5853720153136840674L;
+
+       /** The unique identification of this slot */
+       private final SlotID slotId;
+
+       /** The resource profile of this slot */
+       private final ResourceProfile resourceProfile;
+
+       public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+               this.slotId = checkNotNull(slotId);
+               this.resourceProfile = checkNotNull(resourceProfile);
+       }
+
+       @Override
+       public ResourceID getResourceID() {
+               return slotId.getResourceID();
+       }
+
+       public SlotID getSlotId() {
+               return slotId;
+       }
+
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       /**
+        * Check whether required resource profile can be matched by this slot.
+        *
+        * @param required The required resource profile
+        * @return true if requirement can be matched
+        */
+       public boolean isMatchingRequirement(ResourceProfile required) {
+               return resourceProfile.isMatching(required);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index d1b072d..e831a5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -75,9 +75,15 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
 
        @Override
        public String toString() {
-               return "SlotID{" +
-                       "resourceId=" + resourceId +
-                       ", slotId=" + slotId +
-                       '}';
+               return resourceId + "_" + slotId;
+       }
+
+       /**
+        * Generate a random slot id.
+        *
+        * @return A random slot id.
+        */
+       public static SlotID generate() {
+               return new SlotID(ResourceID.generate(), 0);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
index d8fe268..74c7c39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/SlotRequest.java
@@ -18,8 +18,57 @@
 
 package org.apache.flink.runtime.rpc.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
 import java.io.Serializable;
 
-public class SlotRequest implements Serializable{
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the requirement of the slot, mainly used by JobManager 
requesting slot from ResourceManager.
+ */
+public class SlotRequest implements Serializable {
+
        private static final long serialVersionUID = -6586877187990445986L;
+
+       /** The JobID of the slot requested for */
+       private final JobID jobId;
+
+       /** The unique identification of this request */
+       private final AllocationID allocationId;
+
+       /** The resource profile of the required slot */
+       private final ResourceProfile resourceProfile;
+
+       public SlotRequest(JobID jobId, AllocationID allocationId, 
ResourceProfile resourceProfile) {
+               this.jobId = checkNotNull(jobId);
+               this.allocationId = checkNotNull(allocationId);
+               this.resourceProfile = checkNotNull(resourceProfile);
+       }
+
+       /**
+        * Get the JobID of the slot requested for.
+        * @return The job id
+        */
+       public JobID getJobId() {
+               return jobId;
+       }
+
+       /**
+        * Get the unique identification of this request
+        * @return the allocation id
+        */
+       public AllocationID getAllocationId() {
+               return allocationId;
+       }
+
+       /**
+        * Get the resource profile of the desired slot
+        * @return The resource profile
+        */
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
new file mode 100644
index 0000000..c372ecb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotReport.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A report about the current status of all slots of the TaskExecutor, 
describing
+ * which slots are available and allocated, and what jobs (JobManagers) the 
allocated slots
+ * have been allocated to.
+ */
+public class SlotReport implements Serializable {
+
+       private static final long serialVersionUID = -3150175198722481689L;
+
+       /** The slots status of the TaskManager */
+       private final List<SlotStatus> slotsStatus;
+
+       /** The resource id which identifies the TaskManager */
+       private final ResourceID resourceID;
+
+       public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID 
resourceID) {
+               this.slotsStatus = checkNotNull(slotsStatus);
+               this.resourceID = checkNotNull(resourceID);
+       }
+
+       public List<SlotStatus> getSlotsStatus() {
+               return slotsStatus;
+       }
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
new file mode 100644
index 0000000..e8e2084
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/SlotStatus.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This describes the slot current status which located in TaskManager.
+ */
+public class SlotStatus implements Serializable {
+
+       private static final long serialVersionUID = 5099191707339664493L;
+
+       /** slotID to identify a slot */
+       private final SlotID slotID;
+
+       /** the resource profile of the slot */
+       private final ResourceProfile profiler;
+
+       /** if the slot is allocated, allocationId identify its allocation; 
else, allocationId is null */
+       private final AllocationID allocationID;
+
+       /** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
+       private final JobID jobID;
+
+       public SlotStatus(SlotID slotID, ResourceProfile profiler) {
+               this(slotID, profiler, null, null);
+       }
+
+       public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID 
allocationID, JobID jobID) {
+               this.slotID = checkNotNull(slotID, "slotID cannot be null");
+               this.profiler = checkNotNull(profiler, "profile cannot be 
null");
+               this.allocationID = allocationID;
+               this.jobID = jobID;
+       }
+
+       /**
+        * Get the unique identification of this slot
+        *
+        * @return The slot id
+        */
+       public SlotID getSlotID() {
+               return slotID;
+       }
+
+       /**
+        * Get the resource profile of this slot
+        *
+        * @return The resource profile
+        */
+       public ResourceProfile getProfiler() {
+               return profiler;
+       }
+
+       /**
+        * Get the allocation id of this slot
+        *
+        * @return The allocation id if this slot is allocated, otherwise null
+        */
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+
+       /**
+        * Get the job id of the slot allocated for
+        *
+        * @return The job id if this slot is allocated, otherwise null
+        */
+       public JobID getJobID() {
+               return jobID;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               SlotStatus that = (SlotStatus) o;
+
+               if (!slotID.equals(that.slotID)) {
+                       return false;
+               }
+               if (!profiler.equals(that.profiler)) {
+                       return false;
+               }
+               if (allocationID != null ? 
!allocationID.equals(that.allocationID) : that.allocationID != null) {
+                       return false;
+               }
+               return jobID != null ? jobID.equals(that.jobID) : that.jobID == 
null;
+
+       }
+
+       @Override
+       public int hashCode() {
+               int result = slotID.hashCode();
+               result = 31 * result + profiler.hashCode();
+               result = 31 * result + (allocationID != null ? 
allocationID.hashCode() : 0);
+               result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
+               return result;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e22c64d/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
new file mode 100644
index 0000000..2ee280f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.clusterframework;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+       private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+       private static final long DEFAULT_TESTING_MEMORY = 512;
+
+       private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+               new ResourceProfile(DEFAULT_TESTING_CPU_CORES, 
DEFAULT_TESTING_MEMORY);
+
+       private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+               new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
+
+       private ResourceManagerGateway resourceManagerGateway;
+
+       @Before
+       public void setUp() {
+               resourceManagerGateway = mock(ResourceManagerGateway.class);
+       }
+
+       /**
+        * Tests that there are no free slots when we request, need to allocate 
from cluster manager master
+        */
+       @Test
+       public void testRequestSlotWithoutFreeSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that there are some free slots when we request, and the 
request is fulfilled immediately
+        */
+       @Test
+       public void testRequestSlotWithFreeSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
+               assertEquals(1, slotManager.getFreeSlotCount());
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertEquals(0, slotManager.getAllocatedContainers().size());
+       }
+
+       /**
+        * Tests that there are some free slots when we request, but none of 
them are suitable
+        */
+       @Test
+       public void testRequestSlotWithoutSuitableSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
2);
+               assertEquals(2, slotManager.getFreeSlotCount());
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(2, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that we send duplicated slot request
+        */
+       @Test
+       public void testDuplicatedSlotRequest() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
+
+               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+               slotManager.requestSlot(request1);
+               slotManager.requestSlot(request2);
+               slotManager.requestSlot(request2);
+               slotManager.requestSlot(request1);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that we send multiple slot requests
+        */
+       @Test
+       public void testRequestMultipleSlots() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
5);
+
+               // request 3 normal slots
+               for (int i = 0; i < 3; ++i) {
+                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_PROFILE));
+               }
+
+               // request 2 big slots
+               for (int i = 0; i < 2; ++i) {
+                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               }
+
+               // request 1 normal slot again
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(4, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(2, slotManager.getPendingRequestCount());
+               assertEquals(2, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(1));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, and we used it to 
fulfill a pending request
+        */
+       @Test
+       public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               assertEquals(1, slotManager.getPendingRequestCount());
+
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, but we have no pending 
request
+        */
+       @Test
+       public void testNewlyAppearedFreeSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, but it't not suitable 
for all the pending requests
+        */
+       @Test
+       public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               assertEquals(1, slotManager.getPendingRequestCount());
+
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, and it's been reported 
using by some job
+        */
+       @Test
+       public void testNewlyAppearedInUseSlot() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that we had a slot in-use, and it's confirmed by SlotReport
+        */
+       @Test
+       public void testExistingInUseSlotUpdateStatus() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+
+               // slot status is confirmed
+               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE,
+                       request.getAllocationId(), request.getJobId());
+               slotManager.updateSlotStatus(slotStatus2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that we had a slot in-use, but it's empty according to the 
SlotReport
+        */
+       @Test
+       public void testExistingInUseSlotAdjustedToEmpty() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request1);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               // another request pending
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+               // but slot is reported empty again, request2 will be 
fulfilled, request1 will be missing
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       }
+
+       /**
+        * Tests that we had a slot in use, and it's also reported in use by 
TaskManager, but the allocation
+        * information didn't match.
+        */
+       @Test
+       public void testExistingInUseSlotWithDifferentAllocationInfo() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
+               // update slot status with different allocation info
+               slotManager.updateSlotStatus(slotStatus2);
+
+               // original request is missing and won't be allocated
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               
assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+       }
+
+       /**
+        * Tests that we had a free slot, and it's confirmed by SlotReport
+        */
+       @Test
+       public void testExistingEmptySlotUpdateStatus() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+       /**
+        * Tests that we had a free slot, and it's reported in-use by 
TaskManager
+        */
+       @Test
+       public void testExistingEmptySlotAdjustedToInUse() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE,
+                       new AllocationID(), new JobID());
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+       }
+
+       /**
+        * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
+        */
+       @Test
+       public void testSlotAllocationFailedAtTaskManager() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+
+       /**
+        * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
+        */
+       @Test
+       public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.addFreeSlot(slot);
+
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // slot is set empty by heartbeat
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
slot.getResourceProfile());
+               slotManager.updateSlotStatus(slotStatus);
+
+               // another request took this slot
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+               // original request should be pended
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       }
+
+       @Test
+       public void testNotifyTaskManagerFailure() {
+               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
+
+               ResourceID resource1 = ResourceID.generate();
+               ResourceID resource2 = ResourceID.generate();
+
+               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE);
+               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE);
+               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE);
+               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE);
+
+               slotManager.addFreeSlot(slot11);
+               slotManager.addFreeSlot(slot21);
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(2, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               slotManager.addFreeSlot(slot12);
+               slotManager.addFreeSlot(slot22);
+
+               assertEquals(2, slotManager.getAllocatedSlotCount());
+               assertEquals(2, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               slotManager.notifyTaskManagerFailure(resource2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               // notify an not exist resource failure
+               slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing utilities
+       // 
------------------------------------------------------------------------
+
+       private void directlyProvideFreeSlots(
+               final SlotManager slotManager,
+               final ResourceProfile resourceProfile,
+               final int freeSlotNum)
+       {
+               for (int i = 0; i < freeSlotNum; ++i) {
+                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing classes
+       // 
------------------------------------------------------------------------
+
+       private static class TestingSlotManager extends SlotManager {
+
+               private final List<ResourceProfile> allocatedContainers;
+
+               TestingSlotManager(ResourceManagerGateway 
resourceManagerGateway) {
+                       super(resourceManagerGateway);
+                       this.allocatedContainers = new LinkedList<>();
+               }
+
+               /**
+                * Choose slot randomly if it matches requirement
+                *
+                * @param request   The slot request
+                * @param freeSlots All slots which can be used
+                * @return The chosen slot or null if cannot find a match
+                */
+               @Override
+               protected ResourceSlot chooseSlotToUse(SlotRequest request, 
Map<SlotID, ResourceSlot> freeSlots) {
+                       for (ResourceSlot slot : freeSlots.values()) {
+                               if 
(slot.isMatchingRequirement(request.getResourceProfile())) {
+                                       return slot;
+                               }
+                       }
+                       return null;
+               }
+
+               /**
+                * Choose request randomly if offered slot can match its 
requirement
+                *
+                * @param offeredSlot     The free slot
+                * @param pendingRequests All the pending slot requests
+                * @return The chosen request's AllocationID or null if cannot 
find a match
+                */
+               @Override
+               protected SlotRequest chooseRequestToFulfill(ResourceSlot 
offeredSlot,
+                       Map<AllocationID, SlotRequest> pendingRequests)
+               {
+                       for (Map.Entry<AllocationID, SlotRequest> 
pendingRequest : pendingRequests.entrySet()) {
+                               if 
(offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile()))
 {
+                                       return pendingRequest.getValue();
+                               }
+                       }
+                       return null;
+               }
+
+               @Override
+               protected void allocateContainer(ResourceProfile 
resourceProfile) {
+                       allocatedContainers.add(resourceProfile);
+               }
+
+               List<ResourceProfile> getAllocatedContainers() {
+                       return allocatedContainers;
+               }
+       }
+}

Reply via email to