http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index f055971..31edbf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -21,519 +21,893 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests 
in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with 
TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which 
is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool 
or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based 
on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ * The slot manager is responsible for maintaining a view on all registered 
task manager slots,
+ * their allocation and all pending slot requests. Whenever a new slot is 
registered or and
+ * allocated slot is freed, then it tries to fulfill another pending slot 
request. Whenever there
+ * are not enough slots available the slot manager will notify the resource 
manager about it via
+ * {@link ResourceManagerActions#allocateResource(ResourceProfile)}.
+ *
+ * In order to free resources and avoid resource leaks, idling task managers 
(task managers whose
+ * slots are currently not used) and not fulfilled pending slot requests time 
out triggering their
+ * release and failure, respectively.
  */
-public abstract class SlotManager {
+public class SlotManager implements AutoCloseable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
+
+       /** Scheduled executor for timeouts */
+       private final ScheduledExecutor scheduledExecutor;
+
+       /** Timeout for slot requests to the task manager */
+       private final Time taskManagerRequestTimeout;
+
+       /** Timeout after which an allocation is discarded */
+       private final Time slotRequestTimeout;
+
+       /** Timeout after which an unused TaskManager is released */
+       private final Time taskManagerTimeout;
+
+       /** Map for all registered slots */
+       private final HashMap<SlotID, TaskManagerSlot> slots;
 
-       protected final Logger LOG = LoggerFactory.getLogger(getClass());
+       /** Index of all currently free slots */
+       private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots;
 
-       /** The Resource allocation provider */
-       protected final ResourceManagerServices rmServices;
+       /** All currently registered task managers */
+       private final HashMap<InstanceID, TaskManagerRegistration> 
taskManagerRegistrations;
 
-       /** All registered task managers with ResourceID and gateway. */
-       private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
+       /** Map of fulfilled and active allocations for request deduplication 
purposes */
+       private final HashMap<AllocationID, SlotID> fulfilledSlotRequests;
 
-       /** All registered slots, including free and allocated slots */
-       private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
+       /** Map of pending/unfulfilled slot allocation requests */
+       private final HashMap<AllocationID, PendingSlotRequest> 
pendingSlotRequests;
 
-       /** All pending slot requests, waiting available slots to fulfil */
-       private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+       /** Leader id of the containing component */
+       private UUID leaderId;
 
-       /** All free slots that can be used to be allocated */
-       private final Map<SlotID, ResourceSlot> freeSlots;
+       /** Executor for future callbacks which have to be "synchronized" */
+       private Executor mainThreadExecutor;
 
-       /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
-       private final AllocationMap allocationMap;
+       /** Callbacks for resource (de-)allocations */
+       private ResourceManagerActions resourceManagerActions;
 
-       private final Time timeout;
+       /** True iff the component has been started */
+       private boolean started;
 
-       public SlotManager(ResourceManagerServices rmServices) {
-               this.rmServices = checkNotNull(rmServices);
-               this.registeredSlots = new HashMap<>(16);
-               this.pendingSlotRequests = new LinkedHashMap<>(16);
-               this.freeSlots = new HashMap<>(16);
-               this.allocationMap = new AllocationMap();
-               this.taskManagers = new HashMap<>();
-               this.timeout = Time.seconds(10);
+       public SlotManager(
+                       ScheduledExecutor scheduledExecutor,
+                       Time taskManagerRequestTimeout,
+                       Time slotRequestTimeout,
+                       Time taskManagerTimeout) {
+               this.scheduledExecutor = 
Preconditions.checkNotNull(scheduledExecutor);
+               this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
+               this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
+               this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
+
+               slots = new HashMap<>(16);
+               freeSlots = new LinkedHashMap<>(16);
+               taskManagerRegistrations = new HashMap<>(4);
+               fulfilledSlotRequests = new HashMap<>(16);
+               pendingSlotRequests = new HashMap<>(16);
+
+               leaderId = null;
+               resourceManagerActions = null;
+               started = false;
        }
 
-       // 
------------------------------------------------------------------------
-       //  slot managements
-       // 
------------------------------------------------------------------------
+       // 
---------------------------------------------------------------------------------------------
+       // Component lifecycle methods
+       // 
---------------------------------------------------------------------------------------------
 
        /**
-        * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
-        * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
-        * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
-        * RPC's main thread to avoid race condition).
+        * Starts the slot manager with the given leader id and resource 
manager actions.
         *
-        * @param request The detailed request of the slot
-        * @return RMSlotRequestRegistered The confirmation message to be send 
to the caller
+        * @param newLeaderId to use for communication with the task managers
+        * @param newResourceManagerActions to use for resource (de-)allocations
+        */
+       public void start(UUID newLeaderId, Executor newMainThreadExecutor, 
ResourceManagerActions newResourceManagerActions) {
+               leaderId = Preconditions.checkNotNull(newLeaderId);
+               mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
+               resourceManagerActions = 
Preconditions.checkNotNull(newResourceManagerActions);
+
+               started = true;
+       }
+
+       /**
+        * Suspends the component. This clears the internal state of the slot 
manager.
         */
-       public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
-               final AllocationID allocationId = request.getAllocationId();
-               if (isRequestDuplicated(request)) {
-                       LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
-                       return new RMSlotRequestRegistered(allocationId);
+       public void suspend() {
+               for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+                       cancelPendingSlotRequest(pendingSlotRequest);
                }
 
-               // try to fulfil the request with current free slots
-               final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
-               if (slot != null) {
-                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-                               allocationId, request.getJobId());
+               pendingSlotRequests.clear();
+
+               ArrayList<InstanceID> registeredTaskManagers = new 
ArrayList<>(taskManagerRegistrations.keySet());
+
+               for (InstanceID registeredTaskManager : registeredTaskManagers) 
{
+                       unregisterTaskManager(registeredTaskManager);
+               }
 
-                       // record this allocation in bookkeeping
-                       allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
-                       // remove selected slot from free pool
-                       freeSlots.remove(slot.getSlotId());
+               leaderId = null;
+               resourceManagerActions = null;
+               started = false;
+       }
+
+       /**
+        * Closes the slot manager.
+        *
+        * @throws Exception if the close operation fails
+        */
+       @Override
+       public void close() throws Exception {
+               suspend();
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Public API
+       // 
---------------------------------------------------------------------------------------------
+
+       /**
+        * Requests a slot with the respective resource profile.
+        *
+        * @param slotRequest specifying the requested slot specs
+        * @return true if the slot request was registered; false if the 
request is a duplicate
+        * @throws SlotManagerException if the slot request failed (e.g. not 
enough resources left)
+        */
+       public boolean registerSlotRequest(SlotRequest slotRequest) throws 
SlotManagerException {
+               checkInit();
 
-                       sendSlotRequest(slot, request);
+               if (checkDuplicateRequest(slotRequest.getAllocationId())) {
+                       LOG.debug("Ignoring a duplicate slot request with 
allocation id {}.", slotRequest.getAllocationId());
+
+                       return false;
                } else {
-                       LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
-                               "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
-                       Preconditions.checkState(rmServices != null,
-                               "Attempted to allocate resources but no 
ResourceManagerServices set.");
-                       
rmServices.allocateResource(request.getResourceProfile());
-                       pendingSlotRequests.put(allocationId, request);
+                       PendingSlotRequest pendingSlotRequest = new 
PendingSlotRequest(slotRequest);
+
+                       pendingSlotRequests.put(slotRequest.getAllocationId(), 
pendingSlotRequest);
+
+                       try {
+                               internalRequestSlot(pendingSlotRequest);
+                       } catch (ResourceManagerException e) {
+                               // requesting the slot failed --> remove 
pending slot request
+                               
pendingSlotRequests.remove(slotRequest.getAllocationId());
+
+                               throw new SlotManagerException("Could not 
fulfill slot request " + slotRequest.getAllocationId() + '.', e);
+                       }
+
+                       return true;
                }
+       }
+
+       /**
+        * Cancels and removes a pending slot request with the given allocation 
id. If there is no such
+        * pending request, then nothing is done.
+        *
+        * @param allocationId identifying the pending slot request
+        * @return True if a pending slot request was found; otherwise false
+        */
+       public boolean unregisterSlotRequest(AllocationID allocationId) {
+               checkInit();
+
+               PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.remove(allocationId);
+
+               if (null != pendingSlotRequest) {
+                       cancelPendingSlotRequest(pendingSlotRequest);
+
+                       return true;
+               } else {
+                       LOG.debug("No pending slot request with allocation id 
{} found.", allocationId);
 
-               return new RMSlotRequestRegistered(allocationId);
+                       return false;
+               }
        }
 
        /**
-        * Notifies the SlotManager that a slot is available again after being 
allocated.
-        * @param slotID slot id of available slot
+        * Registers a new task manager at the slot manager. This will make the 
task managers slots
+        * known and, thus, available for allocation.
+        *
+        * @param taskExecutorConnection for the new task manager
+        * @param initialSlotReport for the new task manager
         */
-       public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
-               if (!allocationMap.isAllocated(slotID)) {
-                       throw new IllegalStateException("Slot was not 
previously allocated but " +
-                               "TaskManager reports it as available again");
+       public void registerTaskManager(final TaskExecutorConnection 
taskExecutorConnection, SlotReport initialSlotReport) {
+               checkInit();
+
+               // we identify task managers by their instance id
+               if 
(!taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) 
{
+                       TaskManagerRegistration taskManagerRegistration = new 
TaskManagerRegistration(taskExecutorConnection);
+                       
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), 
taskManagerRegistration);
                }
-               allocationMap.removeAllocation(slotID);
-               final Map<SlotID, ResourceSlot> slots = 
registeredSlots.get(resourceID);
-               ResourceSlot freeSlot = slots.get(slotID);
-               if (freeSlot == null) {
-                       throw new IllegalStateException("Slot was not 
registered with SlotManager but " +
-                               "TaskManager reported it to be available.");
+
+               reportSlotStatus(taskExecutorConnection.getInstanceID(), 
initialSlotReport);
+       }
+
+       /**
+        * Unregisters the task manager identified by the given instance id and 
its associated slots
+        * from the slot manager.
+        *
+        * @param instanceId identifying the task manager to unregister
+        * @return True if there existed a registered task manager with the 
given instance id
+        */
+       public boolean unregisterTaskManager(InstanceID instanceId) {
+               checkInit();
+
+               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
+
+               if (null != taskManagerRegistration) {
+                       removeSlots(taskManagerRegistration.getSlots());
+
+                       taskManagerRegistration.cancelTimeout();
+
+                       return true;
+               } else {
+                       LOG.debug("There is no task manager registered with 
instance ID {}. Ignoring this message.", instanceId);
+
+                       return false;
                }
-               handleFreeSlot(freeSlot);
        }
 
        /**
-        * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
-        * or really rejected by TaskManager. We shall retry this request by:
-        * <ul>
-        * <li>1. verify and clear all the previous allocate information for 
this request
-        * <li>2. try to request slot again
-        * </ul>
-        * <p>
-        * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
-        * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
-        * but it can be taken care of by rejecting registration at JobManager.
+        * Reports the current slot allocations for a task manager identified 
by the given instance id.
         *
-        * @param originalRequest The original slot request
-        * @param slotId          The target SlotID
+        * @param instanceId identifying the task manager for which to report 
the slot status
+        * @param slotReport containing the status for all of its slots
+        * @return true if the slot status has been updated successfully, 
otherwise false
         */
-       void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
-               final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
-               LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
-                       slotId, originalAllocationId, 
originalRequest.getJobId());
-
-               if (allocationMap.isAllocated(slotId)) {
-                       final AllocationID expectedAllocationId = 
allocationMap.getAllocationID(slotId);
-
-                       // check whether we have an agreement on whom this slot 
belongs to
-                       if (originalAllocationId.equals(expectedAllocationId)) {
-                               LOG.info("De-allocate this request and retry");
-                               
allocationMap.removeAllocation(expectedAllocationId);
-                               
pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
-                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
-                               // treat this slot as empty and retry with a 
different request
-                               handleFreeSlot(slot);
-                       } else {
-                               LOG.error("Slot request failed for slot {} with 
allocation id {}:" +
-                                               " Allocation id did not match 
the expected allocation id {}.",
-                                       slotId, originalAllocationId, 
expectedAllocationId);
+       public boolean reportSlotStatus(InstanceID instanceId, SlotReport 
slotReport) {
+               checkInit();
+
+               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
+
+               if (null != taskManagerRegistration) {
+                       ArrayList<SlotID> slotsToRemove = new 
ArrayList<>(taskManagerRegistration.getSlots());
+                       boolean idle = true;
+
+                       for (SlotStatus slotStatus : slotReport) {
+                               if 
(slotsToRemove.remove(slotStatus.getSlotID())) {
+                                       // slot which was already registered
+                                       updateSlot(slotStatus.getSlotID(), 
slotStatus.getAllocationID());
+                               } else {
+                                       // new slot
+                                       registerSlot(
+                                               taskManagerRegistration,
+                                               slotStatus.getSlotID(),
+                                               slotStatus.getAllocationID(),
+                                               slotStatus.getResourceProfile(),
+                                               
taskManagerRegistration.getTaskManagerConnection());
+                               }
+
+                               TaskManagerSlot slot = 
slots.get(slotStatus.getSlotID());
+
+                               idle &= slot.isFree();
+                       }
+
+                       // remove the slots for which we haven't received a 
slot status message
+                       removeSlots(slotsToRemove);
+
+                       if (idle) {
+                               // no slot of this task manager is being used 
--> register timer to free this resource
+                               
registerTaskManagerTimeout(taskManagerRegistration);
                        }
+
+                       return true;
                } else {
-                       LOG.error("Slot request failed for slot {} with 
allocation id {}: " +
-                                       "Slot was not previously registered.",
-                               slotId, originalAllocationId);
+                       LOG.debug("Received slot report for unknown task 
manager with instance id {}. Ignoring this report.", instanceId);
+
+                       return false;
                }
        }
 
        /**
-        * Registers a TaskExecutor
-        * @param resourceID TaskExecutor's ResourceID
-        * @param registration TaskExecutor's registration
-        * @param slotReport TaskExecutor's free and allocated slots
+        * Free the given slot from the given allocation. If the slot is still 
allocated by the given
+        * allocation id, then the slot will be marked as free and will be 
subject to new slot requests.
+        *
+        * @param slotId identifying the slot to free
+        * @param allocationId with which the slot is presumably allocated
         */
-       public void registerTaskExecutor(
-                       ResourceID resourceID,
-                       TaskExecutorRegistration registration,
-                       SlotReport slotReport) {
+       public void freeSlot(SlotID slotId, AllocationID allocationId) {
+               checkInit();
 
-               if (taskManagers.get(resourceID) != null) {
-                       notifyTaskManagerFailure(resourceID);
-               }
+               TaskManagerSlot slot = slots.get(slotId);
+
+               if (null != slot) {
+                       if (slot.isAllocated()) {
+                               if (Objects.equals(allocationId, 
slot.getAllocationId())) {
+                                       // free the slot
+                                       slot.setAllocationId(null);
+                                       
fulfilledSlotRequests.remove(allocationId);
 
-               this.taskManagers.put(resourceID, registration);
+                                       if (slot.isFree()) {
+                                               handleFreeSlot(slot);
+                                       }
 
-               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-                       final SlotID slotId = slotStatus.getSlotID();
+                                       TaskManagerRegistration 
taskManagerRegistration = taskManagerRegistrations.get(slot.getInstanceId());
 
-                       final TaskExecutorRegistration taskExecutorRegistration 
= taskManagers.get(slotId.getResourceID());
-                       if (taskExecutorRegistration == null) {
-                               LOG.info("Received SlotStatus but ResourceID {} 
is unknown to the SlotManager",
-                                       slotId.getResourceID());
-                               return;
+                                       if (null != taskManagerRegistration && 
!anySlotUsed(taskManagerRegistration.getSlots())) {
+                                               
registerTaskManagerTimeout(taskManagerRegistration);
+                                       }
+                               } else {
+                                       LOG.debug("Received request to free 
slot {} with expected allocation id {}, " +
+                                               "but actual allocation id {} 
differs. Ignoring the request.", slotId, allocationId, slot.getAllocationId());
+                               }
+                       } else {
+                               LOG.debug("Slot {} has not been allocated.", 
allocationId);
                        }
+               } else {
+                       LOG.debug("Trying to free a slot {} which has not been 
registered. Ignoring this message.", slotId);
+               }
+       }
 
-                       final ResourceSlot slot = new ResourceSlot(slotId, 
slotStatus.getProfiler(), taskExecutorRegistration);
+       // 
---------------------------------------------------------------------------------------------
+       // Behaviour methods
+       // 
---------------------------------------------------------------------------------------------
 
-                       registerNewSlot(slot);
-                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, slotStatus.getAllocationID());
+       /**
+        * Finds a matching slot request for a given resource profile. If there 
is no such request,
+        * the method returns null.
+        *
+        * Note: If you want to change the behaviour of the slot manager wrt 
slot allocation and
+        * request fulfillment, then you should override this method.
+        *
+        * @param slotResourceProfile defining the resources of an available 
slot
+        * @return A matching slot request which can be deployed in a slot with 
the given resource
+        * profile. Null if there is no such slot request pending.
+        */
+       protected PendingSlotRequest findMatchingRequest(ResourceProfile 
slotResourceProfile) {
 
-                       if (slotStatus.getAllocationID() != null) {
-                               // slot in use, record this in bookkeeping
-                               allocationMap.addAllocation(slotId, 
slotStatus.getAllocationID());
-                       } else {
-                               handleFreeSlot(slot);
+               for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+                       if (!pendingSlotRequest.isAssigned() && 
slotResourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) {
+                               return pendingSlotRequest;
                        }
                }
+
+               return null;
        }
 
        /**
-        * Callback for TaskManager failures. In case that a TaskManager fails, 
we have to clean up all its slots.
+        * Finds a matching slot for a given resource profile. A matching slot 
has at least as many
+        * resources available as the given resource profile. If there is no 
such slot available, then
+        * the method returns null.
         *
-        * @param resourceId The ResourceID of the TaskManager
+        * Note: If you want to change the behaviour of the slot manager wrt 
slot allocation and
+        * request fulfillment, then you should override this method.
+        *
+        * @param requestResourceProfile specifying the resource requirements 
for the a slot request
+        * @return A matching slot which fulfills the given resource profile. 
Null if there is no such
+        * slot available.
         */
-       public void notifyTaskManagerFailure(final ResourceID resourceId) {
-               LOG.info("Resource:{} been notified failure", resourceId);
-               taskManagers.remove(resourceId);
-               final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
-               if (slotIdsToRemove != null) {
-                       for (SlotID slotId : slotIdsToRemove.keySet()) {
-                               LOG.info("Removing Slot: {} upon resource 
failure", slotId);
-                               if (freeSlots.containsKey(slotId)) {
-                                       freeSlots.remove(slotId);
-                               } else if (allocationMap.isAllocated(slotId)) {
-                                       allocationMap.removeAllocation(slotId);
-                               } else {
-                                       LOG.error("BUG! {} is neither in free 
pool nor in allocated pool", slotId);
-                               }
+       protected TaskManagerSlot findMatchingSlot(ResourceProfile 
requestResourceProfile) {
+               Iterator<Map.Entry<SlotID, TaskManagerSlot>> iterator = 
freeSlots.entrySet().iterator();
+
+               while (iterator.hasNext()) {
+                       TaskManagerSlot taskManagerSlot = 
iterator.next().getValue();
+
+                       // sanity check
+                       Preconditions.checkState(taskManagerSlot.isFree());
+
+                       if 
(taskManagerSlot.getResourceProfile().isMatching(requestResourceProfile)) {
+                               iterator.remove();
+                               return taskManagerSlot;
                        }
                }
+
+               return null;
        }
 
-       // 
------------------------------------------------------------------------
-       //  internal behaviors
-       // 
------------------------------------------------------------------------
+       // 
---------------------------------------------------------------------------------------------
+       // Internal slot operations
+       // 
---------------------------------------------------------------------------------------------
 
        /**
-        * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
-        * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
-        * to the free pool.
+        * Registers a slot for the given task manager at the slot manager. The 
slot is identified by
+        * the given slot id. The given resource profile defines the available 
resources for the slot.
+        * The task manager connection can be used to communicate with the task 
manager.
         *
-        * @param freeSlot The free slot
+        * @param taskManagerRegistration Task manager for which to register 
the given slot
+        * @param slotId identifying the slot on the task manager
+        * @param allocationId which is currently deployed in the slot
+        * @param resourceProfile of the slot
+        * @param taskManagerConnection to communicate with the remote task 
manager
         */
-       private void handleFreeSlot(final ResourceSlot freeSlot) {
-               SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, 
pendingSlotRequests);
+       private void registerSlot(
+                       TaskManagerRegistration taskManagerRegistration,
+                       SlotID slotId,
+                       AllocationID allocationId,
+                       ResourceProfile resourceProfile,
+                       TaskExecutorConnection taskManagerConnection) {
+
+               Preconditions.checkNotNull(taskManagerRegistration);
+
+               TaskManagerSlot slot = new TaskManagerSlot(
+                       slotId,
+                       resourceProfile,
+                       taskManagerConnection,
+                       allocationId);
+
+               slots.put(slotId, slot);
+
+               taskManagerRegistration.addSlot(slotId);
+
+               if (slot.isFree()) {
+                       handleFreeSlot(slot);
+               }
 
-               if (chosenRequest != null) {
-                       final AllocationID allocationId = 
chosenRequest.getAllocationId();
-                       final SlotRequest slotRequest = 
pendingSlotRequests.remove(allocationId);
+               if (slot.isAllocated()) {
+                       fulfilledSlotRequests.put(slot.getAllocationId(), 
slotId);
+               }
+       }
+
+       /**
+        * Updates a slot with the given allocation id.
+        *
+        * @param slotId to update
+        * @param allocationId specifying the current allocation of the slot
+        */
+       private void updateSlot(SlotID slotId, AllocationID allocationId) {
+               TaskManagerSlot slot = slots.get(slotId);
+
+               if (null != slot) {
+                       // we assume the given allocation id to be the ground 
truth (coming from the TM)
+                       slot.setAllocationId(allocationId);
+
+                       if (null != allocationId) {
+                               if (slot.hasPendingSlotRequest()){
+                                       // we have a pending slot request --> 
check whether we have to reject it
+                                       PendingSlotRequest pendingSlotRequest = 
slot.getAssignedSlotRequest();
+
+                                       if 
(Objects.equals(pendingSlotRequest.getAllocationId(), allocationId)) {
+                                               // we can cancel the slot 
request because it has been fulfilled
+                                               
cancelPendingSlotRequest(pendingSlotRequest);
+
+                                               // remove the pending slot 
request, since it has been completed
+                                               
pendingSlotRequests.remove(pendingSlotRequest.getAllocationId());
+                                       } else {
+                                               // this will try to find a new 
slot for the request
+                                               rejectPendingSlotRequest(
+                                                       pendingSlotRequest,
+                                                       new Exception("Task 
manager reported slot " + slotId + " being already allocated."));
+                                       }
+
+                                       slot.setAssignedSlotRequest(null);
+                               }
 
-                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
-                               allocationId, chosenRequest.getJobId());
-                       allocationMap.addAllocation(freeSlot.getSlotId(), 
allocationId);
+                               fulfilledSlotRequests.put(allocationId, slotId);
 
-                       sendSlotRequest(freeSlot, slotRequest);
+                               TaskManagerRegistration taskManagerRegistration 
= taskManagerRegistrations.get(slot.getInstanceId());
+
+                               if (null != taskManagerRegistration) {
+                                       // disable any registered time out for 
the task manager
+                                       taskManagerRegistration.cancelTimeout();
+                               }
+                       }
                } else {
-                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
+                       LOG.debug("Trying to update unknown slot with slot id 
{}.", slotId);
                }
        }
 
-       private void sendSlotRequest(final ResourceSlot freeSlot, final 
SlotRequest slotRequest) {
+       /**
+        * Tries to allocate a slot for the given slot request. If there is no 
slot available, the
+        * resource manager is informed to allocate more resources and a 
timeout for the request is
+        * registered.
+        *
+        * @param pendingSlotRequest to allocate a slot for
+        * @throws ResourceManagerException if the resource manager cannot 
allocate more resource
+        */
+       private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) 
throws ResourceManagerException {
+               TaskManagerSlot taskManagerSlot = 
findMatchingSlot(pendingSlotRequest.getResourceProfile());
 
-               final AllocationID allocationID = slotRequest.getAllocationId();
-               final TaskExecutorRegistration registration = 
freeSlot.getTaskExecutorRegistration();
-               final Future<TMSlotRequestReply> slotRequestReplyFuture =
-                       registration.getTaskExecutorGateway()
-                               .requestSlot(
-                                       freeSlot.getSlotId(),
-                                       slotRequest.getJobId(),
-                                       allocationID,
-                                       "foobar", // TODO: set proper JM address
-                                       rmServices.getLeaderID(),
-                                       timeout);
+               if (taskManagerSlot != null) {
+                       allocateSlot(taskManagerSlot, pendingSlotRequest);
+               } else {
+                       final UUID timeoutIdentifier = UUID.randomUUID();
+                       final AllocationID allocationId = 
pendingSlotRequest.getAllocationId();
+
+                       // register timeout for slot request
+                       ScheduledFuture<?> timeoutFuture = 
scheduledExecutor.schedule(new Runnable() {
+                               @Override
+                               public void run() {
+                                       mainThreadExecutor.execute(new 
Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       
timeoutSlotRequest(allocationId, timeoutIdentifier);
+                                               }
+                                       });
+                               }
+                       }, slotRequestTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+
+                       pendingSlotRequest.registerTimeout(timeoutFuture, 
timeoutIdentifier);
 
-               slotRequestReplyFuture.handleAsync(new 
BiFunction<TMSlotRequestReply, Throwable, Void>() {
+                       
resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
+               }
+       }
+
+       /**
+        * Allocates the given slot for the given slot request. This entails 
sending a registration
+        * message to the task manager and treating failures.
+        *
+        * @param taskManagerSlot to allocate for the given slot request
+        * @param pendingSlotRequest to allocate the given slot for
+        */
+       private void allocateSlot(TaskManagerSlot taskManagerSlot, 
PendingSlotRequest pendingSlotRequest) {
+               TaskExecutorConnection taskExecutorConnection = 
taskManagerSlot.getTaskManagerConnection();
+               TaskExecutorGateway gateway = 
taskExecutorConnection.getTaskExecutorGateway();
+
+               final CompletableFuture<Acknowledge> completableFuture = new 
FlinkCompletableFuture<>();
+               final AllocationID allocationId = 
pendingSlotRequest.getAllocationId();
+               final SlotID slotId = taskManagerSlot.getSlotId();
+
+               taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
+               pendingSlotRequest.setRequestFuture(completableFuture);
+
+               // RPC call to the task manager
+               Future<Acknowledge> requestFuture = gateway.requestSlot(
+                       slotId,
+                       pendingSlotRequest.getJobId(),
+                       allocationId,
+                       pendingSlotRequest.getTargetAddress(),
+                       leaderId,
+                       taskManagerRequestTimeout);
+
+               requestFuture.handle(new BiFunction<Acknowledge, Throwable, 
Void>() {
+                       @Override
+                       public Void apply(Acknowledge acknowledge, Throwable 
throwable) {
+                               if (acknowledge != null) {
+                                       completableFuture.complete(acknowledge);
+                               } else {
+                                       
completableFuture.completeExceptionally(throwable);
+                               }
+
+                               return null;
+                       }
+               });
+
+               completableFuture.handleAsync(new BiFunction<Acknowledge, 
Throwable, Void>() {
                        @Override
-                       public Void apply(TMSlotRequestReply slotRequestReply, 
Throwable throwable) {
-                               TaskExecutorRegistration current = 
taskManagers.get(slotRequestReply.getResourceID());
-                               if (current != null && 
current.getInstanceID().equals(slotRequestReply.getInstanceID())) {
-                                       if (throwable != null || 
slotRequestReply instanceof TMSlotRequestRejected) {
-                                               
handleSlotRequestFailedAtTaskManager(slotRequest, freeSlot.getSlotId());
+                       public Void apply(Acknowledge acknowledge, Throwable 
throwable) {
+                               if (acknowledge != null) {
+                                       updateSlot(slotId, allocationId);
+                               } else {
+                                       if (throwable instanceof 
SlotOccupiedException) {
+                                               SlotOccupiedException exception 
= (SlotOccupiedException) throwable;
+                                               updateSlot(slotId, 
exception.getAllocationId());
                                        } else {
-                                               LOG.debug("Successfully 
registered slot {} ", freeSlot.getSlotId());
+                                               
removeSlotRequestFromSlot(slotId, allocationId);
+                                       }
+
+                                       if (!(throwable instanceof 
CancellationException)) {
+                                               handleFailedSlotRequest(slotId, 
allocationId, throwable);
+                                       } else {
+                                               LOG.debug("Slot allocation 
request {} has been cancelled.", allocationId, throwable);
                                        }
-                               } else {
-                                       LOG.debug("Discarding message from 
obsolete TaskExecutor with InstanceID {}",
-                                               
slotRequestReply.getInstanceID());
                                }
+
                                return null;
                        }
-               }, rmServices.getMainThreadExecutor());
+               }, mainThreadExecutor);
        }
 
        /**
-        * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
-        * formerly received slot request, it is either in pending list or 
already been allocated.
+        * Handles a free slot. It first tries to find a pending slot request 
which can be fulfilled.
+        * If there is no such request, then it will add the slot to the set of 
free slots.
         *
-        * @param request The slot request
-        * @return <tt>true</tt> if the request is duplicated
+        * @param freeSlot to find a new slot request for
         */
-       private boolean isRequestDuplicated(final SlotRequest request) {
-               final AllocationID allocationId = request.getAllocationId();
-               return pendingSlotRequests.containsKey(allocationId)
-                       || allocationMap.isAllocated(allocationId);
+       private void handleFreeSlot(TaskManagerSlot freeSlot) {
+               PendingSlotRequest pendingSlotRequest = 
findMatchingRequest(freeSlot.getResourceProfile());
+
+               if (null != pendingSlotRequest) {
+                       allocateSlot(freeSlot, pendingSlotRequest);
+               } else {
+                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
+               }
        }
 
        /**
-        * Registers a new slot with the SlotManager.
+        * Removes the given set of slots from the slot manager.
         *
-        * @param slot The ResourceSlot which will be registered
+        * @param slotsToRemove identifying the slots to remove from the slot 
manager
         */
-       private void registerNewSlot(final ResourceSlot slot) {
-               final SlotID slotId = slot.getSlotId();
-               final ResourceID resourceId = slotId.getResourceID();
-               if (!registeredSlots.containsKey(resourceId)) {
-                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+       private void removeSlots(Iterable<SlotID> slotsToRemove) {
+               for (SlotID slotId : slotsToRemove) {
+                       removeSlot(slotId);
                }
-               registeredSlots.get(resourceId).put(slotId, slot);
        }
 
-       private ResourceSlot getRegisteredSlot(final SlotID slotId) {
-               final ResourceID resourceId = slotId.getResourceID();
-               if (!registeredSlots.containsKey(resourceId)) {
-                       return null;
+       /**
+        * Removes the given slot from the slot manager.
+        *
+        * @param slotId identifying the slot to remove
+        */
+       private void removeSlot(SlotID slotId) {
+               TaskManagerSlot slot = slots.remove(slotId);
+
+               if (null != slot) {
+                       freeSlots.remove(slotId);
+
+                       if (slot.hasPendingSlotRequest()) {
+                               // reject the pending slot request --> 
triggering a new allocation attempt
+                               rejectPendingSlotRequest(
+                                       slot.getAssignedSlotRequest(),
+                                       new Exception("The assigned slot " + 
slot.getSlotId() + " was removed."));
+                       }
+
+                       AllocationID oldAllocationId = slot.getAllocationId();
+
+                       fulfilledSlotRequests.remove(oldAllocationId);
+
+                       TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(slot.getInstanceId());
+
+                       if (null != taskManagerRegistration) {
+                               taskManagerRegistration.removeSlot(slotId);
+                       }
+               } else {
+                       LOG.debug("There was no slot registered with slot id 
{}.", slotId);
                }
-               return registeredSlots.get(resourceId).get(slotId);
        }
 
-       // 
------------------------------------------------------------------------
-       //  Framework specific behavior
-       // 
------------------------------------------------------------------------
+       // 
---------------------------------------------------------------------------------------------
+       // Internal request handling methods
+       // 
---------------------------------------------------------------------------------------------
 
        /**
-        * Choose a slot to use among all free slots, the behavior is framework 
specified.
+        * Removes a pending slot request identified by the given allocation id 
from a slot identified
+        * by the given slot id.
         *
-        * @param request   The slot request
-        * @param freeSlots All slots which can be used
-        * @return The slot we choose to use, <tt>null</tt> if we did not find 
a match
+        * @param slotId identifying the slot
+        * @param allocationId identifying the presumable assigned pending slot 
request
         */
-       protected abstract ResourceSlot chooseSlotToUse(final SlotRequest 
request,
-               final Map<SlotID, ResourceSlot> freeSlots);
+       private void removeSlotRequestFromSlot(SlotID slotId, AllocationID 
allocationId) {
+               TaskManagerSlot taskManagerSlot = slots.get(slotId);
+
+               if (null != taskManagerSlot) {
+                       if (taskManagerSlot.hasPendingSlotRequest() && 
Objects.equals(allocationId, 
taskManagerSlot.getAssignedSlotRequest().getAllocationId())) {
+                               taskManagerSlot.setAssignedSlotRequest(null);
+                       }
+
+                       if (taskManagerSlot.isFree()) {
+                               handleFreeSlot(taskManagerSlot);
+                       }
+
+                       TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
+
+                       if (null != taskManagerRegistration && 
!anySlotUsed(taskManagerRegistration.getSlots())) {
+                               
registerTaskManagerTimeout(taskManagerRegistration);
+                       }
+               } else {
+                       LOG.debug("There was no slot with {} registered. 
Probably this slot has been already freed.", slotId);
+               }
+       }
 
        /**
-        * Choose a pending request to fulfill when we have a free slot, the 
behavior is framework specified.
+        * Handles a failed slot request. The slot manager tries to find a new 
slot fulfilling
+        * the resource requirements for the failed slot request.
         *
-        * @param offeredSlot     The free slot
-        * @param pendingRequests All the pending slot requests
-        * @return The chosen SlotRequest, <tt>null</tt> if we did not find a 
match
+        * @param slotId identifying the slot which was assigned to the slot 
request before
+        * @param allocationId identifying the failed slot request
+        * @param cause of the failure
         */
-       protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
-               final Map<AllocationID, SlotRequest> pendingRequests);
+       private void handleFailedSlotRequest(SlotID slotId, AllocationID 
allocationId, Throwable cause) {
+               PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.get(allocationId);
+
+               LOG.debug("Slot request with allocation id {} failed for slot 
{}.", allocationId, slotId, cause);
+
+               if (null != pendingSlotRequest) {
+                       pendingSlotRequest.setRequestFuture(null);
+
+                       try {
+                               internalRequestSlot(pendingSlotRequest);
+                       } catch (ResourceManagerException e) {
+                               pendingSlotRequests.remove(allocationId);
 
-       // 
------------------------------------------------------------------------
-       //  Helper classes
-       // 
------------------------------------------------------------------------
+                               resourceManagerActions.notifyAllocationFailure(
+                                       pendingSlotRequest.getJobId(),
+                                       allocationId,
+                                       e);
+                       }
+               } else {
+                       LOG.debug("There was not pending slot request with 
allocation id {}. Probably the request has been fulfilled or cancelled.", 
allocationId);
+               }
+       }
 
        /**
-        * We maintain all the allocations with SlotID and AllocationID. We are 
able to get or remove the allocation info
-        * either by SlotID or AllocationID.
+        * Rejects the pending slot request by failing the request future with a
+        * {@link SlotAllocationException}.
+        *
+        * @param pendingSlotRequest to reject
+        * @param cause of the rejection
         */
-       private static class AllocationMap {
+       private void rejectPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest, Exception cause) {
+               CompletableFuture<Acknowledge> request = 
pendingSlotRequest.getRequestFuture();
 
-               /** All allocated slots (by SlotID) */
-               private final Map<SlotID, AllocationID> allocatedSlots;
+               if (null != request) {
+                       request.completeExceptionally(new 
SlotAllocationException(cause));
+               } else {
+                       LOG.debug("Cannot reject pending slot request {}, since 
no request has been sent.", pendingSlotRequest.getAllocationId());
+               }
+       }
 
-               /** All allocated slots (by AllocationID), it'a a inverse view 
of allocatedSlots */
-               private final Map<AllocationID, SlotID> 
allocatedSlotsByAllocationId;
+       /**
+        * Cancels the given slot request.
+        *
+        * @param pendingSlotRequest to cancel
+        */
+       private void cancelPendingSlotRequest(PendingSlotRequest 
pendingSlotRequest) {
+               pendingSlotRequest.cancelTimeout();
 
-               AllocationMap() {
-                       this.allocatedSlots = new HashMap<>(16);
-                       this.allocatedSlotsByAllocationId = new HashMap<>(16);
-               }
+               CompletableFuture<Acknowledge> request = 
pendingSlotRequest.getRequestFuture();
 
-               /**
-                * Add a allocation
-                *
-                * @param slotId       The slot id
-                * @param allocationId The allocation id
-                */
-               void addAllocation(final SlotID slotId, final AllocationID 
allocationId) {
-                       allocatedSlots.put(slotId, allocationId);
-                       allocatedSlotsByAllocationId.put(allocationId, slotId);
+               if (null != request) {
+                       request.cancel(false);
                }
+       }
+
+       // 
---------------------------------------------------------------------------------------------
+       // Internal timeout methods
+       // 
---------------------------------------------------------------------------------------------
+
+       private void timeoutTaskManager(InstanceID instanceId, UUID 
timeoutIdentifier) {
+               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.remove(instanceId);
 
-               /**
-                * De-allocation with slot id
-                *
-                * @param slotId The slot id
-                */
-               void removeAllocation(final SlotID slotId) {
-                       if (allocatedSlots.containsKey(slotId)) {
-                               final AllocationID allocationId = 
allocatedSlots.get(slotId);
-                               allocatedSlots.remove(slotId);
-                               
allocatedSlotsByAllocationId.remove(allocationId);
+               if (null != taskManagerRegistration) {
+                       if (Objects.equals(timeoutIdentifier, 
taskManagerRegistration.getTimeoutIdentifier())) {
+                               if 
(anySlotUsed(taskManagerRegistration.getSlots())) {
+                                       LOG.debug("Cannot release the task 
manager with instance id {}, because some " +
+                                               "of its slots are still being 
used.", instanceId);
+                               } else {
+                                       unregisterTaskManager(instanceId);
+
+                                       
resourceManagerActions.releaseResource(instanceId);
+                               }
+                       } else {
+                               taskManagerRegistrations.put(instanceId, 
taskManagerRegistration);
+
+                               LOG.debug("Expected timeout identifier {} 
differs from the task manager's " +
+                                       "timeout identifier {}. Ignoring the 
task manager timeout call.",
+                                       timeoutIdentifier, 
taskManagerRegistration.getTimeoutIdentifier());
                        }
+               } else {
+                       LOG.debug("Could not find a registered task manager 
with instance id {}. Ignoring the task manager timeout call.", instanceId);
                }
+       }
 
-               /**
-                * De-allocation with allocation id
-                *
-                * @param allocationId The allocation id
-                */
-               void removeAllocation(final AllocationID allocationId) {
-                       if 
(allocatedSlotsByAllocationId.containsKey(allocationId)) {
-                               SlotID slotId = 
allocatedSlotsByAllocationId.get(allocationId);
-                               
allocatedSlotsByAllocationId.remove(allocationId);
-                               allocatedSlots.remove(slotId);
+       private void timeoutSlotRequest(AllocationID allocationId, UUID 
timeoutIdentifier) {
+               PendingSlotRequest pendingSlotRequest = 
pendingSlotRequests.remove(allocationId);
+
+               if (null != pendingSlotRequest) {
+                       if (Objects.equals(timeoutIdentifier, 
pendingSlotRequest.getTimeoutIdentifier())) {
+                               if (!pendingSlotRequest.isAssigned()) {
+
+                                       
resourceManagerActions.notifyAllocationFailure(
+                                               pendingSlotRequest.getJobId(),
+                                               allocationId,
+                                               new TimeoutException("The 
allocation could not be fulfilled in time."));
+                               } else {
+                                       LOG.debug("Cannot fail pending slot 
request {} because it has been assigned.", allocationId);
+                               }
+                       } else {
+                               pendingSlotRequests.put(allocationId, 
pendingSlotRequest);
+
+                               LOG.debug("Expected timeout identifier {} 
differs from the pending slot request's " +
+                                       "timeout identifier {}. Ignoring the 
slot request timeout call.",
+                                       timeoutIdentifier, 
pendingSlotRequest.getTimeoutIdentifier());
                        }
+               } else {
+                       LOG.debug("Could not find pending slot request with 
allocation id {}. Ignoring the slot request timeout call.", allocationId);
                }
+       }
 
-               /**
-                * Check whether allocation exists by slot id
-                *
-                * @param slotId The slot id
-                * @return true if the allocation exists
-                */
-               boolean isAllocated(final SlotID slotId) {
-                       return allocatedSlots.containsKey(slotId);
-               }
+       // 
---------------------------------------------------------------------------------------------
+       // Internal utility methods
+       // 
---------------------------------------------------------------------------------------------
 
-               /**
-                * Check whether allocation exists by allocation id
-                *
-                * @param allocationId The allocation id
-                * @return true if the allocation exists
-                */
-               boolean isAllocated(final AllocationID allocationId) {
-                       return 
allocatedSlotsByAllocationId.containsKey(allocationId);
-               }
+       private boolean checkDuplicateRequest(AllocationID allocationId) {
+               return pendingSlotRequests.containsKey(allocationId) || 
fulfilledSlotRequests.containsKey(allocationId);
+       }
 
-               AllocationID getAllocationID(final SlotID slotId) {
-                       return allocatedSlots.get(slotId);
-               }
+       private boolean anySlotUsed(Iterable<SlotID> slotsToCheck) {
 
-               SlotID getSlotID(final AllocationID allocationId) {
-                       return allocatedSlotsByAllocationId.get(allocationId);
-               }
+               if (null != slotsToCheck) {
+                       boolean idle = true;
 
-               public int size() {
-                       return allocatedSlots.size();
-               }
+                       for (SlotID slotId : slotsToCheck) {
+                               TaskManagerSlot taskManagerSlot = 
slots.get(slotId);
 
-               public void clear() {
-                       allocatedSlots.clear();
-                       allocatedSlotsByAllocationId.clear();
+                               if (null != taskManagerSlot) {
+                                       idle &= taskManagerSlot.isFree();
+                               }
+                       }
+
+                       return !idle;
+               } else {
+                       return false;
                }
        }
 
-       /**
-        * Clears the state of the SlotManager after leadership revokal
-        */
-       public void clearState() {
-               taskManagers.clear();
-               registeredSlots.clear();
-               pendingSlotRequests.clear();
-               freeSlots.clear();
-               allocationMap.clear();
-       }
+       private void registerTaskManagerTimeout(final TaskManagerRegistration 
taskManagerRegistration) {
+               final UUID timeoutIdentifier = UUID.randomUUID();
 
-       // 
------------------------------------------------------------------------
-       //  Testing utilities
-       // 
------------------------------------------------------------------------
+               ScheduledFuture<?> timeoutFuture = 
scheduledExecutor.schedule(new Runnable() {
+                       @Override
+                       public void run() {
+                               mainThreadExecutor.execute(new Runnable() {
+                                       @Override
+                                       public void run() {
+                                               
timeoutTaskManager(taskManagerRegistration.getInstanceId(), timeoutIdentifier);
+                                       }
+                               });
+                       }
+               }, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
-       @VisibleForTesting
-       boolean isAllocated(final SlotID slotId) {
-               return allocationMap.isAllocated(slotId);
+               taskManagerRegistration.registerTimeout(timeoutFuture, 
timeoutIdentifier);
        }
 
-       @VisibleForTesting
-       boolean isAllocated(final AllocationID allocationId) {
-               return allocationMap.isAllocated(allocationId);
+       private void checkInit() {
+               Preconditions.checkState(started, "The slot manager has not 
been started.");
        }
 
-       /**
-        * Add free slots directly to the free pool, this will not trigger 
pending requests allocation
-        *
-        * @param slot The resource slot
-        */
-       @VisibleForTesting
-       void addFreeSlot(final ResourceSlot slot) {
-               final ResourceID resourceId = slot.getResourceID();
-               final SlotID slotId = slot.getSlotId();
+       // 
---------------------------------------------------------------------------------------------
+       // Testing methods
+       // 
---------------------------------------------------------------------------------------------
 
-               if (!registeredSlots.containsKey(resourceId)) {
-                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
-               }
-               registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
-               freeSlots.put(slotId, slot);
+       @VisibleForTesting
+       TaskManagerSlot getSlot(SlotID slotId) {
+               return slots.get(slotId);
        }
 
        @VisibleForTesting
-       int getAllocatedSlotCount() {
-               return allocationMap.size();
+       int getNumberRegisteredSlots() {
+               return slots.size();
        }
 
        @VisibleForTesting
-       int getFreeSlotCount() {
-               return freeSlots.size();
+       PendingSlotRequest getSlotRequest(AllocationID allocationId) {
+               return pendingSlotRequests.get(allocationId);
        }
 
        @VisibleForTesting
-       int getPendingRequestCount() {
-               return pendingSlotRequests.size();
+       boolean hasTimeoutRegistered(InstanceID instanceId) {
+               TaskManagerRegistration taskManagerRegistration = 
taskManagerRegistrations.get(instanceId);
+
+               if (null != taskManagerRegistration) {
+                       return taskManagerRegistration.getTimeoutIdentifier() 
!= null;
+               } else {
+                       return false;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
new file mode 100644
index 0000000..d21c251
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+public class SlotManagerConfiguration {
+
+       private final Time taskManagerRequestTimeout;
+       private final Time slotRequestTimeout;
+       private final Time taskManagerTimeout;
+
+       public SlotManagerConfiguration(
+                       Time taskManagerRequestTimeout,
+                       Time slotRequestTimeout,
+                       Time taskManagerTimeout) {
+               this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
+               this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
+               this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
+       }
+
+       public Time getTaskManagerRequestTimeout() {
+               return taskManagerRequestTimeout;
+       }
+
+       public Time getSlotRequestTimeout() {
+               return slotRequestTimeout;
+       }
+
+       public Time getTaskManagerTimeout() {
+               return taskManagerTimeout;
+       }
+
+       public static SlotManagerConfiguration fromConfiguration(Configuration 
configuration) throws ConfigurationException {
+               ConfigOption<String> timeoutOption = ConfigOptions
+                       .key(ConfigConstants.AKKA_ASK_TIMEOUT)
+                       .defaultValue(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+
+               final String strTimeout = 
configuration.getString(timeoutOption);
+               final Time timeout;
+
+               try {
+                       timeout = 
Time.milliseconds(Duration.apply(strTimeout).toMillis());
+               } catch (NumberFormatException e) {
+                       throw new ConfigurationException("Could not parse the 
resource manager's timeout " +
+                               "value " + timeoutOption + '.', e);
+               }
+
+               return new SlotManagerConfiguration(timeout, timeout, timeout);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
new file mode 100644
index 0000000..c322c81
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+public class SlotManagerException extends ResourceManagerException {
+
+       private static final long serialVersionUID = -3723028616920379071L;
+
+       public SlotManagerException(String message) {
+               super(message);
+       }
+
+       public SlotManagerException(String message, Throwable cause) {
+               super(message, cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
deleted file mode 100644
index b4e9c99..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
-
-/**
- * Factory to create a SlotManager and provide it with dependencies.
- */
-public interface SlotManagerFactory {
-
-       /**
-        * Creates a SlotManager and provides it with ResourceManager services.
-        */
-       SlotManager create(ResourceManagerServices rmServices);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
new file mode 100644
index 0000000..8e23dbb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerRegistration.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
+
+public class TaskManagerRegistration {
+
+       private final TaskExecutorConnection taskManagerConnection;
+
+       private final HashSet<SlotID> slots;
+
+       private UUID timeoutIdentifier;
+
+       private ScheduledFuture<?> timeoutFuture;
+
+       public TaskManagerRegistration(TaskExecutorConnection 
taskManagerConnection) {
+               this.taskManagerConnection = 
Preconditions.checkNotNull(taskManagerConnection);
+
+               slots = new HashSet<>(4);
+
+               timeoutIdentifier = null;
+               timeoutFuture = null;
+       }
+
+       public TaskExecutorConnection getTaskManagerConnection() {
+               return taskManagerConnection;
+       }
+
+       public InstanceID getInstanceId() {
+               return taskManagerConnection.getInstanceID();
+       }
+
+       public UUID getTimeoutIdentifier() {
+               return timeoutIdentifier;
+       }
+
+       public Set<SlotID> getSlots() {
+               return slots;
+       }
+
+       public boolean removeSlot(SlotID slotId) {
+               return slots.remove(slotId);
+       }
+
+       public void addSlot(SlotID slotId) {
+               slots.add(slotId);
+       }
+
+       public void cancelTimeout() {
+               if (null != timeoutFuture) {
+                       timeoutFuture.cancel(false);
+
+                       timeoutFuture = null;
+                       timeoutIdentifier = null;
+               }
+       }
+
+       public void registerTimeout(ScheduledFuture<?> newTimeoutFuture, UUID 
newTimeoutIdentifier) {
+               cancelTimeout();
+
+               timeoutFuture = newTimeoutFuture;
+               timeoutIdentifier = newTimeoutIdentifier;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index 54adce6..7a9da28 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -19,8 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -29,12 +30,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * which slots are available and allocated, and what jobs (JobManagers) the 
allocated slots
  * have been allocated to.
  */
-public class SlotReport implements Serializable {
+public class SlotReport implements Serializable, Iterable<SlotStatus> {
 
        private static final long serialVersionUID = -3150175198722481689L;
 
        /** The slots status of the TaskManager */
-       private final List<SlotStatus> slotsStatus;
+       private final Collection<SlotStatus> slotsStatus;
 
        public SlotReport() {
                this(Collections.<SlotStatus>emptyList());
@@ -44,12 +45,16 @@ public class SlotReport implements Serializable {
                this(Collections.singletonList(slotStatus));
        }
 
-       public SlotReport(final List<SlotStatus> slotsStatus) {
+       public SlotReport(final Collection<SlotStatus> slotsStatus) {
                this.slotsStatus = checkNotNull(slotsStatus);
        }
 
-       public List<SlotStatus> getSlotsStatus() {
+       public Collection<SlotStatus> getSlotsStatus() {
                return slotsStatus;
        }
 
+       @Override
+       public Iterator<SlotStatus> iterator() {
+               return slotsStatus.iterator();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 0f57bb1..a3bc4a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -38,7 +38,7 @@ public class SlotStatus implements Serializable {
        private final SlotID slotID;
 
        /** the resource profile of the slot */
-       private final ResourceProfile profiler;
+       private final ResourceProfile resourceProfile;
 
        /** if the slot is allocated, allocationId identify its allocation; 
else, allocationId is null */
        private final AllocationID allocationID;
@@ -46,16 +46,17 @@ public class SlotStatus implements Serializable {
        /** if the slot is allocated, jobId identify which job this slot is 
allocated to; else, jobId is null */
        private final JobID jobID;
 
-       public SlotStatus(SlotID slotID, ResourceProfile profiler) {
-               this(slotID, profiler, null, null);
+       public SlotStatus(SlotID slotID, ResourceProfile resourceProfile) {
+               this(slotID, resourceProfile, null, null);
        }
 
        public SlotStatus(
-                       SlotID slotID, ResourceProfile profiler,
+                       SlotID slotID,
+                       ResourceProfile resourceProfile,
                        JobID jobID,
                        AllocationID allocationID) {
                this.slotID = checkNotNull(slotID, "slotID cannot be null");
-               this.profiler = checkNotNull(profiler, "profile cannot be 
null");
+               this.resourceProfile = checkNotNull(resourceProfile, "profile 
cannot be null");
                this.allocationID = allocationID;
                this.jobID = jobID;
        }
@@ -74,8 +75,8 @@ public class SlotStatus implements Serializable {
         *
         * @return The resource profile
         */
-       public ResourceProfile getProfiler() {
-               return profiler;
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
        }
 
        /**
@@ -110,7 +111,7 @@ public class SlotStatus implements Serializable {
                if (!slotID.equals(that.slotID)) {
                        return false;
                }
-               if (!profiler.equals(that.profiler)) {
+               if (!resourceProfile.equals(that.resourceProfile)) {
                        return false;
                }
                if (allocationID != null ? 
!allocationID.equals(that.allocationID) : that.allocationID != null) {
@@ -123,7 +124,7 @@ public class SlotStatus implements Serializable {
        @Override
        public int hashCode() {
                int result = slotID.hashCode();
-               result = 31 * result + profiler.hashCode();
+               result = 31 * result + resourceProfile.hashCode();
                result = 31 * result + (allocationID != null ? 
allocationID.hashCode() : 0);
                result = 31 * result + (jobID != null ? jobID.hashCode() : 0);
                return result;

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d04cabb..5b8c8ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -61,8 +61,6 @@ import 
org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
@@ -71,6 +69,7 @@ import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
@@ -99,6 +98,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
@@ -315,7 +315,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        throw new TaskSubmissionException(message);
                }
 
-               if 
(!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+               if (!Objects.equals(jobManagerConnection.getLeaderId(), 
jobManagerLeaderId)) {
                        final String message = "Rejecting the task submission 
because the job manager leader id " +
                                jobManagerLeaderId + " does not match the 
expected job manager leader id " +
                                jobManagerConnection.getLeaderId() + '.';
@@ -355,30 +355,30 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                PartitionProducerStateChecker partitionStateChecker = 
jobManagerConnection.getPartitionStateChecker();
 
                Task task = new Task(
-                               jobInformation,
-                               taskInformation,
-                               tdd.getExecutionAttemptId(),
-                               tdd.getAllocationId(),
-                               tdd.getSubtaskIndex(),
-                               tdd.getAttemptNumber(),
-                               tdd.getProducedPartitions(),
-                               tdd.getInputGates(),
-                               tdd.getTargetSlotNumber(),
-                               tdd.getTaskStateHandles(),
-                               memoryManager,
-                               ioManager,
-                               networkEnvironment,
-                               broadcastVariableManager,
-                               taskManagerActions,
-                               inputSplitProvider,
-                               checkpointResponder,
-                               libraryCache,
-                               fileCache,
-                               taskManagerConfiguration,
-                               taskMetricGroup,
-                               resultPartitionConsumableNotifier,
-                               partitionStateChecker,
-                               getRpcService().getExecutor());
+                       jobInformation,
+                       taskInformation,
+                       tdd.getExecutionAttemptId(),
+                       tdd.getAllocationId(),
+                       tdd.getSubtaskIndex(),
+                       tdd.getAttemptNumber(),
+                       tdd.getProducedPartitions(),
+                       tdd.getInputGates(),
+                       tdd.getTargetSlotNumber(),
+                       tdd.getTaskStateHandles(),
+                       memoryManager,
+                       ioManager,
+                       networkEnvironment,
+                       broadcastVariableManager,
+                       taskManagerActions,
+                       inputSplitProvider,
+                       checkpointResponder,
+                       libraryCache,
+                       fileCache,
+                       taskManagerConfiguration,
+                       taskMetricGroup,
+                       resultPartitionConsumableNotifier,
+                       partitionStateChecker,
+                       getRpcService().getExecutor());
 
                log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
 
@@ -561,7 +561,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // 
----------------------------------------------------------------------
 
        /**
-        * /**
         * Requests a slot from the TaskManager
         *
         * @param slotId identifying the requested slot
@@ -573,12 +572,14 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
         * @return answer to the slot request
         */
        @RpcMethod
-       public TMSlotRequestReply requestSlot(
+       public Acknowledge requestSlot(
                final SlotID slotId,
                final JobID jobId,
                final AllocationID allocationId,
                final String targetAddress,
                final UUID rmLeaderId) throws SlotAllocationException {
+               // TODO: Filter invalid requests from the resource manager by 
using the instance/registration Id
+
                log.info("Receive slot request {} for job {} from resource 
manager with leader id {}.",
                        allocationId, jobId, rmLeaderId);
 
@@ -608,7 +609,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        final String message = "The slot " + slotId + " has 
already been allocated for a different job.";
 
                        log.info(message);
-                       throw new SlotAllocationException(message);
+
+                       throw new SlotOccupiedException(message, 
taskSlotTable.getCurrentAllocation(slotId.getSlotNumber()));
                }
 
                if (jobManagerTable.contains(jobId)) {
@@ -635,7 +637,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        }
                }
 
-               return new 
TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationId);
+               return Acknowledge.get();
        }
 
        // 
----------------------------------------------------------------------
@@ -1046,7 +1048,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                                resourceManagerGateway.notifySlotAvailable(
                                        
resourceManagerConnection.getTargetLeaderId(),
                                        
resourceManagerConnection.getRegistrationId(),
-                                       new SlotID(getResourceID(), 
freedSlotIndex));
+                                       new SlotID(getResourceID(), 
freedSlotIndex),
+                                       allocationId);
                        }
                } catch (SlotNotFoundException e) {
                        log.debug("Could not free slot for allocation id {}.", 
allocationId, e);
@@ -1079,7 +1082,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) 
{
                JobManagerConnection jmConnection = jobManagerTable.get(jobId);
 
-               return jmConnection != null && 
jmConnection.getLeaderId().equals(leaderId);
+               return jmConnection != null && 
Objects.equals(jmConnection.getLeaderId(), leaderId);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 2bbf0e6..bedf8ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
@@ -51,7 +50,7 @@ public interface TaskExecutorGateway extends RpcGateway {
         * @throws SlotAllocationException if the slot allocation fails
         * @return answer to the slot request
         */
-       Future<TMSlotRequestReply> requestSlot(
+       Future<Acknowledge> requestSlot(
                SlotID slotId,
                JobID jobId,
                AllocationID allocationId,

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
new file mode 100644
index 0000000..93e67a8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotOccupiedException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.util.Preconditions;
+
+public class SlotOccupiedException extends SlotAllocationException {
+       private static final long serialVersionUID = -3986333914244338888L;
+
+       private final AllocationID allocationId;
+
+       public SlotOccupiedException(String message, AllocationID allocationId) 
{
+               super(message);
+               this.allocationId = Preconditions.checkNotNull(allocationId);
+       }
+
+       public SlotOccupiedException(String message, Throwable cause, 
AllocationID allocationId) {
+               super(message, cause);
+               this.allocationId = Preconditions.checkNotNull(allocationId);
+       }
+
+       public SlotOccupiedException(Throwable cause, AllocationID 
allocationId) {
+               super(cause);
+               this.allocationId = Preconditions.checkNotNull(allocationId);
+       }
+
+       public AllocationID getAllocationId() {
+               return allocationId;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 081d8f8..5c51c7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -509,6 +509,16 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                return new TaskIterator(jobId);
        }
 
+       /**
+        * Get the current allocation for the task slot with the given index.
+        *
+        * @param index identifying the slot for which the allocation id shall 
be retrieved
+        * @return Allocation id of the specified slot if allocated; otherwise 
null
+        */
+       public AllocationID getCurrentAllocation(int index) {
+               return taskSlots.get(index).getAllocationId();
+       }
+
        // ---------------------------------------------------------------------
        // TimeoutListener methods
        // ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index e4e20b9..41c2e16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -49,7 +49,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.TestingSlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
@@ -391,10 +391,14 @@ public class ResourceManagerTest extends TestLogger {
                final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
                final HeartbeatServices heartbeatServices = new 
TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, 
scheduledExecutor);
 
-               final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                final MetricRegistry metricRegistry = 
mock(MetricRegistry.class);
                final JobLeaderIdService jobLeaderIdService = 
mock(JobLeaderIdService.class);
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+               final SlotManager slotManager = new SlotManager(
+                       rpcService.getScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime());
 
                try {
                        final StandaloneResourceManager resourceManager = new 
StandaloneResourceManager(
@@ -404,7 +408,7 @@ public class ResourceManagerTest extends TestLogger {
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
-                               slotManagerFactory,
+                               slotManager,
                                metricRegistry,
                                jobLeaderIdService,
                                testingFatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 89fd22f..b444640 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -80,7 +80,7 @@ public class SlotPoolRpcTest {
                                Time.days(1), Time.days(1),
                                Time.milliseconds(100) // this is the timeout 
for the request tested here
                );
-               pool.start(UUID.randomUUID());
+               pool.start(UUID.randomUUID(), "foobar");
 
                Future<SimpleSlot> future = 
pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index b4149b2..cf95461 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -78,7 +78,9 @@ public class SlotPoolTest extends TestLogger {
 
                mainThreadValidatorUtil.enterMainThread();
 
-               slotPool.start(UUID.randomUUID());
+               final String jobManagerAddress = "foobar";
+
+               slotPool.start(UUID.randomUUID(), jobManagerAddress);
 
                this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 49bc570..986f848 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -25,9 +25,10 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -57,13 +58,17 @@ public class ResourceManagerHATest extends TestLogger {
                        Time.seconds(5L),
                        Time.seconds(5L));
 
-               ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = new 
ResourceManagerRuntimeServicesConfiguration(Time.seconds(5L));
+               ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = new 
ResourceManagerRuntimeServicesConfiguration(
+                       Time.seconds(5L),
+                       new SlotManagerConfiguration(
+                               TestingUtils.infiniteTime(),
+                               TestingUtils.infiniteTime(),
+                               TestingUtils.infiniteTime()));
                ResourceManagerRuntimeServices resourceManagerRuntimeServices = 
ResourceManagerRuntimeServices.fromConfiguration(
                        resourceManagerRuntimeServicesConfiguration,
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor());
 
-               SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
 
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
@@ -76,7 +81,7 @@ public class ResourceManagerHATest extends TestLogger {
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
-                               slotManagerFactory,
+                               resourceManagerRuntimeServices.getSlotManager(),
                                metricRegistry,
                                
resourceManagerRuntimeServices.getJobLeaderIdService(),
                                testingFatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 6a151ac..4836f74 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -31,10 +31,11 @@ import 
org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
@@ -240,12 +241,17 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
                        Time.seconds(5L));
-               SlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                        highAvailabilityServices,
                        rpcService.getScheduledExecutor(),
                        Time.minutes(5L));
+                       
+               final SlotManager slotManager = new SlotManager(
+                       rpcService.getScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime());
 
                ResourceManager resourceManager = new StandaloneResourceManager(
                        rpcService,
@@ -254,7 +260,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                        resourceManagerConfiguration,
                        highAvailabilityServices,
                        heartbeatServices,
-                       slotManagerFactory,
+                       slotManager,
                        metricRegistry,
                        jobLeaderIdService,
                        fatalErrorHandler);

Reply via email to