[FLINK-4538][FLINK-4348] ResourceManager slot allocation protcol

- associates JobMasters with JobID instead of InstanceID
- adds TaskExecutorGateway to slot
- adds SlotManager as RM constructor parameter
- adds LeaderRetrievalListener to SlotManager to keep track of the leader id

- tests the interaction JM->RM requestSlot
- tests the interaction RM->TM requestSlot

This closes #2463


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8a48fa50
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8a48fa50
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8a48fa50

Branch: refs/heads/flip-6
Commit: 8a48fa503d808cd588daee06f90d9e010ee47e89
Parents: 8e9db6b
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Sep 1 16:53:31 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Oct 6 13:38:41 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java |   2 +-
 .../clusterframework/types/ResourceSlot.java    |  14 +-
 .../resourcemanager/JobMasterRegistration.java  |  10 +-
 .../resourcemanager/RegistrationResponse.java   |   9 +-
 .../resourcemanager/ResourceManager.java        | 167 +++---
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../runtime/resourcemanager/SlotAssignment.java |  25 -
 .../runtime/resourcemanager/SlotManager.java    | 523 -----------------
 .../resourcemanager/SlotRequestRegistered.java  |  33 ++
 .../resourcemanager/SlotRequestRejected.java    |  34 ++
 .../resourcemanager/SlotRequestReply.java       |  41 ++
 .../slotmanager/SimpleSlotManager.java          |  59 ++
 .../slotmanager/SlotManager.java                | 579 +++++++++++++++++++
 .../flink/runtime/taskexecutor/SlotStatus.java  |   5 +-
 .../taskexecutor/TaskExecutorGateway.java       |  17 +
 .../resourcemanager/ResourceManagerHATest.java  |   4 +-
 .../resourcemanager/SlotManagerTest.java        | 538 -----------------
 .../slotmanager/SlotManagerTest.java            | 554 ++++++++++++++++++
 .../slotmanager/SlotProtocolTest.java           | 225 +++++++
 .../flink/runtime/rpc/TestingRpcService.java    |   6 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   4 +
 21 files changed, 1677 insertions(+), 1174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index ff1c4bf..fa3aabc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -68,6 +68,6 @@ public class ResourceProfile implements Serializable {
         * @return true if the requirement is matched, otherwise false
         */
        public boolean isMatching(ResourceProfile required) {
-               return Double.compare(cpuCores, required.getCpuCores()) >= 0 && 
memoryInMB >= required.getMemoryInMB();
+               return cpuCores >= required.getCpuCores() && memoryInMB >= 
required.getMemoryInMB();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
index 8a6db5f..5fb8aee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
 import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -26,7 +28,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A ResourceSlot represents a slot located in TaskManager from 
ResourceManager's view. It has a unique
  * identification and resource profile which we can compare to the resource 
request.
  */
-public class ResourceSlot implements ResourceIDRetrievable, Serializable {
+public class ResourceSlot implements ResourceIDRetrievable {
 
        private static final long serialVersionUID = -5853720153136840674L;
 
@@ -36,9 +38,13 @@ public class ResourceSlot implements ResourceIDRetrievable, 
Serializable {
        /** The resource profile of this slot */
        private final ResourceProfile resourceProfile;
 
-       public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile) {
+       /** Gateway to the TaskExecutor which owns the slot */
+       private final TaskExecutorGateway taskExecutorGateway;
+
+       public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, 
TaskExecutorGateway taskExecutorGateway) {
                this.slotId = checkNotNull(slotId);
                this.resourceProfile = checkNotNull(resourceProfile);
+               this.taskExecutorGateway = taskExecutorGateway;
        }
 
        @Override
@@ -54,6 +60,10 @@ public class ResourceSlot implements ResourceIDRetrievable, 
Serializable {
                return resourceProfile;
        }
 
+       public TaskExecutorGateway getTaskExecutorGateway() {
+               return taskExecutorGateway;
+       }
+
        /**
         * Check whether required resource profile can be matched by this slot.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 309dcc1..439e56b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,18 +18,26 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import org.apache.flink.api.common.JobID;
+
 import java.io.Serializable;
 
 public class JobMasterRegistration implements Serializable {
        private static final long serialVersionUID = 8411214999193765202L;
 
        private final String address;
+       private final JobID jobID;
 
-       public JobMasterRegistration(String address) {
+       public JobMasterRegistration(String address, JobID jobID) {
                this.address = address;
+               this.jobID = jobID;
        }
 
        public String getAddress() {
                return address;
        }
+
+       public JobID getJobID() {
+               return jobID;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
index fb6c401..796e634 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/RegistrationResponse.java
@@ -18,26 +18,19 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.runtime.instance.InstanceID;
-
 import java.io.Serializable;
 
 public class RegistrationResponse implements Serializable {
        private static final long serialVersionUID = -2379003255993119993L;
 
        private final boolean isSuccess;
-       private final InstanceID instanceID;
 
-       public RegistrationResponse(boolean isSuccess, InstanceID instanceID) {
+       public RegistrationResponse(boolean isSuccess) {
                this.isSuccess = isSuccess;
-               this.instanceID = instanceID;
        }
 
        public boolean isSuccess() {
                return isSuccess;
        }
 
-       public InstanceID getInstanceID() {
-               return instanceID;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 44c022b..29aba1a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -21,11 +21,13 @@ package org.apache.flink.runtime.resourcemanager;
 import akka.dispatch.Mapper;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -33,6 +35,8 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
@@ -51,16 +55,28 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-       private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> 
implements LeaderContender {
+
+       private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+       private final Map<JobID, JobMasterGateway> jobMasterGateways;
+
        private final HighAvailabilityServices highAvailabilityServices;
-       private LeaderElectionService leaderElectionService = null;
-       private UUID leaderSessionID = null;
 
-       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices) {
+       private LeaderElectionService leaderElectionService;
+
+       private final SlotManager slotManager;
+
+       private UUID leaderSessionID;
+
+       public ResourceManager(
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       SlotManager slotManager) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.jobMasterGateways = new HashMap<>();
+               this.slotManager = slotManager;
        }
 
        @Override
@@ -69,7 +85,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                try {
                        super.start();
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
-                       leaderElectionService.start(new 
ResourceManagerLeaderContender());
+                       leaderElectionService.start(this);
                } catch (Throwable e) {
                        log.error("A fatal error happened when starting the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
@@ -94,7 +110,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
         */
        @VisibleForTesting
        UUID getLeaderSessionID() {
-               return leaderSessionID;
+               return this.leaderSessionID;
        }
 
        /**
@@ -105,21 +121,20 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
         */
        @RpcMethod
        public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
+               final Future<JobMasterGateway> jobMasterFuture =
+                       
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
+               final JobID jobID = jobMasterRegistration.getJobID();
 
                return jobMasterFuture.map(new Mapper<JobMasterGateway, 
RegistrationResponse>() {
                        @Override
                        public RegistrationResponse apply(final 
JobMasterGateway jobMasterGateway) {
-                               InstanceID instanceID;
 
-                               if 
(jobMasterGateways.containsKey(jobMasterGateway)) {
-                                       instanceID = 
jobMasterGateways.get(jobMasterGateway);
-                               } else {
-                                       instanceID = new InstanceID();
-                                       jobMasterGateways.put(jobMasterGateway, 
instanceID);
+                               final JobMasterGateway existingGateway = 
jobMasterGateways.put(jobID, jobMasterGateway);
+                               if (existingGateway != null) {
+                                       LOG.info("Replacing existing gateway {} 
for JobID {} with  {}.",
+                                               existingGateway, jobID, 
jobMasterGateway);
                                }
-
-                               return new RegistrationResponse(true, 
instanceID);
+                               return new RegistrationResponse(true);
                        }
                }, getMainThreadExecutionContext());
        }
@@ -131,9 +146,16 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
         * @return Slot assignment
         */
        @RpcMethod
-       public SlotAssignment requestSlot(SlotRequest slotRequest) {
-               System.out.println("SlotRequest: " + slotRequest);
-               return new SlotAssignment();
+       public SlotRequestReply requestSlot(SlotRequest slotRequest) {
+               final JobID jobId = slotRequest.getJobId();
+               final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+
+               if (jobMasterGateway != null) {
+                       return slotManager.requestSlot(slotRequest);
+               } else {
+                       LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+                       return new 
SlotRequestRejected(slotRequest.getAllocationId());
+               }
        }
 
 
@@ -154,61 +176,62 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
        }
 
-       private class ResourceManagerLeaderContender implements LeaderContender 
{
-
-               /**
-                * Callback method when current resourceManager is granted 
leadership
-                *
-                * @param leaderSessionID unique leadershipID
-                */
-               @Override
-               public void grantLeadership(final UUID leaderSessionID) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
granted leadership with leader session ID {}", getAddress(), leaderSessionID);
-                                       ResourceManager.this.leaderSessionID = 
leaderSessionID;
-                                       // confirming the leader session ID 
might be blocking,
-                                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                               }
-                       });
-               }
 
-               /**
-                * Callback method when current resourceManager lose leadership.
-                */
-               @Override
-               public void revokeLeadership() {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
-                                       jobMasterGateways.clear();
-                                       leaderSessionID = null;
-                               }
-                       });
-               }
+       // 
------------------------------------------------------------------------
+       //  Leader Contender
+       // 
------------------------------------------------------------------------
 
-               @Override
-               public String getAddress() {
-                       return ResourceManager.this.getAddress();
-               }
+       /**
+        * Callback method when current resourceManager is granted leadership
+        *
+        * @param leaderSessionID unique leadershipID
+        */
+       @Override
+       public void grantLeadership(final UUID leaderSessionID) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), leaderSessionID);
+                               // confirming the leader session ID might be 
blocking,
+                               
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+                               // notify SlotManager
+                               slotManager.notifyLeaderAddress(getAddress(), 
leaderSessionID);
+                               ResourceManager.this.leaderSessionID = 
leaderSessionID;
+                       }
+               });
+       }
 
-               /**
-                * Handles error occurring in the leader election service
-                *
-                * @param exception Exception being thrown in the leader 
election service
-                */
-               @Override
-               public void handleError(final Exception exception) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.error("ResourceManager received an 
error from the LeaderElectionService.", exception);
-                                       // terminate ResourceManager in case of 
an error
-                                       shutDown();
-                               }
-                       });
-               }
+       /**
+        * Callback method when current resourceManager lose leadership.
+        */
+       @Override
+       public void revokeLeadership() {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("ResourceManager {} was revoked 
leadership.", getAddress());
+                               jobMasterGateways.clear();
+                               ResourceManager.this.leaderSessionID = null;
+                       }
+               });
+       }
+
+       /**
+        * Handles error occurring in the leader election service
+        *
+        * @param exception Exception being thrown in the leader election 
service
+        */
+       @Override
+       public void handleError(final Exception exception) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.error("ResourceManager received an error 
from the LeaderElectionService.", exception);
+                               // notify SlotManager
+                               slotManager.handleError(exception);
+                               // terminate ResourceManager in case of an error
+                               shutDown();
+                       }
+               });
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index b5782b0..e5c8b64 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -58,7 +58,7 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @param slotRequest Slot request
         * @return Future slot assignment
         */
-       Future<SlotAssignment> requestSlot(SlotRequest slotRequest);
+       Future<SlotRequestRegistered> requestSlot(SlotRequest slotRequest);
 
        /**
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
deleted file mode 100644
index 695204d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotAssignment.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import java.io.Serializable;
-
-public class SlotAssignment implements Serializable{
-       private static final long serialVersionUID = -6990813455942742322L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
deleted file mode 100644
index 5c06648..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotManager.java
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
- * slots from registered TaskManagers and issues container allocation requests 
in case of there are not
- * enough available slots. Besides, it should sync its slot allocation with 
TaskManager's heartbeat.
- * <p>
- * The main operation principle of SlotManager is:
- * <ul>
- * <li>1. All slot allocation status should be synced with TaskManager, which 
is the ground truth.</li>
- * <li>2. All slots that have registered must be tracked, either by free pool 
or allocated pool.</li>
- * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
- * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
- * be handled outside SlotManager. SlotManager will make each decision based 
on the information it currently
- * holds.</li>
- * </ul>
- * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
- */
-public abstract class SlotManager {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
-
-       /** Gateway to communicate with ResourceManager */
-       private final ResourceManagerGateway resourceManagerGateway;
-
-       /** All registered slots, including free and allocated slots */
-       private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
-
-       /** All pending slot requests, waiting available slots to fulfil */
-       private final Map<AllocationID, SlotRequest> pendingSlotRequests;
-
-       /** All free slots that can be used to be allocated */
-       private final Map<SlotID, ResourceSlot> freeSlots;
-
-       /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
-       private final AllocationMap allocationMap;
-
-       public SlotManager(ResourceManagerGateway resourceManagerGateway) {
-               this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
-               this.registeredSlots = new HashMap<>(16);
-               this.pendingSlotRequests = new LinkedHashMap<>(16);
-               this.freeSlots = new HashMap<>(16);
-               this.allocationMap = new AllocationMap();
-       }
-
-       // 
------------------------------------------------------------------------
-       //  slot managements
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
-        * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
-        * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
-        * RPC's main thread to avoid race condition).
-        *
-        * @param request The detailed request of the slot
-        */
-       public void requestSlot(final SlotRequest request) {
-               if (isRequestDuplicated(request)) {
-                       LOG.warn("Duplicated slot request, AllocationID:{}", 
request.getAllocationId());
-                       return;
-               }
-
-               // try to fulfil the request with current free slots
-               ResourceSlot slot = chooseSlotToUse(request, freeSlots);
-               if (slot != null) {
-                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
-                               request.getAllocationId(), request.getJobId());
-
-                       // record this allocation in bookkeeping
-                       allocationMap.addAllocation(slot.getSlotId(), 
request.getAllocationId());
-
-                       // remove selected slot from free pool
-                       freeSlots.remove(slot.getSlotId());
-
-                       // TODO: send slot request to TaskManager
-               } else {
-                       LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
-                               "AllocationID:{}, JobID:{}", 
request.getAllocationId(), request.getJobId());
-                       allocateContainer(request.getResourceProfile());
-                       pendingSlotRequests.put(request.getAllocationId(), 
request);
-               }
-       }
-
-       /**
-        * Sync slot status with TaskManager's SlotReport.
-        */
-       public void updateSlotStatus(final SlotReport slotReport) {
-               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-                       updateSlotStatus(slotStatus);
-               }
-       }
-
-       /**
-        * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
-        * or really rejected by TaskManager. We shall retry this request by:
-        * <ul>
-        * <li>1. verify and clear all the previous allocate information for 
this request
-        * <li>2. try to request slot again
-        * </ul>
-        * <p>
-        * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
-        * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
-        * but it can be taken care of by rejecting registration at JobManager.
-        *
-        * @param originalRequest The original slot request
-        * @param slotId          The target SlotID
-        */
-       public void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
-               final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
-               LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
-                       slotId, originalAllocationId, 
originalRequest.getJobId());
-
-               // verify the allocation info before we do anything
-               if (freeSlots.containsKey(slotId)) {
-                       // this slot is currently empty, no need to de-allocate 
it from our allocations
-                       LOG.info("Original slot is somehow empty, retrying this 
request");
-
-                       // before retry, we should double check whether this 
request was allocated by some other ways
-                       if (!allocationMap.isAllocated(originalAllocationId)) {
-                               requestSlot(originalRequest);
-                       } else {
-                               LOG.info("The failed request has somehow been 
allocated, SlotID:{}",
-                                       
allocationMap.getSlotID(originalAllocationId));
-                       }
-               } else if (allocationMap.isAllocated(slotId)) {
-                       final AllocationID currentAllocationId = 
allocationMap.getAllocationID(slotId);
-
-                       // check whether we have an agreement on whom this slot 
belongs to
-                       if (originalAllocationId.equals(currentAllocationId)) {
-                               LOG.info("De-allocate this request and retry");
-                               
allocationMap.removeAllocation(currentAllocationId);
-
-                               // put this slot back to free pool
-                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
-                               freeSlots.put(slotId, slot);
-
-                               // retry the request
-                               requestSlot(originalRequest);
-                       } else {
-                               // the slot is taken by someone else, no need 
to de-allocate it from our allocations
-                               LOG.info("Original slot is taken by someone 
else, current AllocationID:{}", currentAllocationId);
-
-                               // before retry, we should double check whether 
this request was allocated by some other ways
-                               if 
(!allocationMap.isAllocated(originalAllocationId)) {
-                                       requestSlot(originalRequest);
-                               } else {
-                                       LOG.info("The failed request is somehow 
been allocated, SlotID:{}",
-                                               
allocationMap.getSlotID(originalAllocationId));
-                               }
-                       }
-               } else {
-                       LOG.error("BUG! {} is neither in free pool nor in 
allocated pool", slotId);
-               }
-       }
-
-       /**
-        * Callback for TaskManager failures. In case that a TaskManager fails, 
we have to clean up all its slots.
-        *
-        * @param resourceId The ResourceID of the TaskManager
-        */
-       public void notifyTaskManagerFailure(final ResourceID resourceId) {
-               LOG.info("Resource:{} been notified failure", resourceId);
-               final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
-               if (slotIdsToRemove != null) {
-                       for (SlotID slotId : slotIdsToRemove.keySet()) {
-                               LOG.info("Removing Slot:{} upon resource 
failure", slotId);
-                               if (freeSlots.containsKey(slotId)) {
-                                       freeSlots.remove(slotId);
-                               } else if (allocationMap.isAllocated(slotId)) {
-                                       allocationMap.removeAllocation(slotId);
-                               } else {
-                                       LOG.error("BUG! {} is neither in free 
pool nor in allocated pool", slotId);
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  internal behaviors
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Update slot status based on TaskManager's report. There are mainly 
two situations when we receive the report:
-        * <ul>
-        * <li>1. The slot is newly registered.</li>
-        * <li>2. The slot has registered, it contains its current status.</li>
-        * </ul>
-        * <p>
-        * Regarding 1: It's fairly simple, we just record this slot's status, 
and trigger schedule if slot is empty.
-        * <p>
-        * Regarding 2: It will cause some weird situation since we may have 
some time-gap on how the slot's status really
-        * is. We may have some updates on the slot's allocation, but it 
doesn't reflected by TaskManager's heartbeat yet,
-        * and we may make some wrong decision if we cannot guarantee we have 
the exact status about all the slots. So
-        * the principle here is: We always trust TaskManager's heartbeat, we 
will correct our information based on that
-        * and take next action based on the diff between our information and 
heartbeat status.
-        *
-        * @param reportedStatus Reported slot status
-        */
-       void updateSlotStatus(final SlotStatus reportedStatus) {
-               final SlotID slotId = reportedStatus.getSlotID();
-               final ResourceSlot slot = new ResourceSlot(slotId, 
reportedStatus.getProfiler());
-
-               if (registerNewSlot(slot)) {
-                       // we have a newly registered slot
-                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
-                       if (reportedStatus.getAllocationID() != null) {
-                               // slot in use, record this in bookkeeping
-                               allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
-                       } else {
-                               handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
-                       }
-               } else {
-                       // slot exists, update current information
-                       if (reportedStatus.getAllocationID() != null) {
-                               // slot is reported in use
-                               final AllocationID reportedAllocationId = 
reportedStatus.getAllocationID();
-
-                               // check whether we also thought this slot is 
in use
-                               if (allocationMap.isAllocated(slotId)) {
-                                       // we also think that slot is in use, 
check whether the AllocationID matches
-                                       final AllocationID currentAllocationId 
= allocationMap.getAllocationID(slotId);
-
-                                       if 
(!reportedAllocationId.equals(currentAllocationId)) {
-                                               LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:{}",
-                                                       slotId, 
currentAllocationId, reportedAllocationId);
-
-                                               // seems we have a disagreement 
about the slot assignments, need to correct it
-                                               
allocationMap.removeAllocation(slotId);
-                                               
allocationMap.addAllocation(slotId, reportedAllocationId);
-                                       }
-                               } else {
-                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:null, reported:{}",
-                                               slotId, reportedAllocationId);
-
-                                       // we thought the slot is free, should 
correct this information
-                                       allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
-
-                                       // remove this slot from free slots pool
-                                       freeSlots.remove(slotId);
-                               }
-                       } else {
-                               // slot is reported empty
-
-                               // check whether we also thought this slot is 
empty
-                               if (allocationMap.isAllocated(slotId)) {
-                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
-                                               slotId, 
allocationMap.getAllocationID(slotId));
-
-                                       // we thought the slot is in use, 
correct it
-                                       allocationMap.removeAllocation(slotId);
-
-                                       // we have a free slot!
-                                       handleFreeSlot(new ResourceSlot(slotId, 
reportedStatus.getProfiler()));
-                               }
-                       }
-               }
-       }
-
-       /**
-        * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
-        * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
-        * to the free pool.
-        *
-        * @param freeSlot The free slot
-        */
-       private void handleFreeSlot(final ResourceSlot freeSlot) {
-               SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, 
pendingSlotRequests);
-
-               if (chosenRequest != null) {
-                       
pendingSlotRequests.remove(chosenRequest.getAllocationId());
-
-                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
-                               chosenRequest.getAllocationId(), 
chosenRequest.getJobId());
-                       allocationMap.addAllocation(freeSlot.getSlotId(), 
chosenRequest.getAllocationId());
-
-                       // TODO: send slot request to TaskManager
-               } else {
-                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
-               }
-       }
-
-       /**
-        * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
-        * formerly received slot request, it is either in pending list or 
already been allocated.
-        *
-        * @param request The slot request
-        * @return <tt>true</tt> if the request is duplicated
-        */
-       private boolean isRequestDuplicated(final SlotRequest request) {
-               final AllocationID allocationId = request.getAllocationId();
-               return pendingSlotRequests.containsKey(allocationId)
-                       || allocationMap.isAllocated(allocationId);
-       }
-
-       /**
-        * Try to register slot, and tell if this slot is newly registered.
-        *
-        * @param slot The ResourceSlot which will be checked and registered
-        * @return <tt>true</tt> if we meet a new slot
-        */
-       private boolean registerNewSlot(final ResourceSlot slot) {
-               final SlotID slotId = slot.getSlotId();
-               final ResourceID resourceId = slotId.getResourceID();
-               if (!registeredSlots.containsKey(resourceId)) {
-                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
-               }
-               return registeredSlots.get(resourceId).put(slotId, slot) == 
null;
-       }
-
-       private ResourceSlot getRegisteredSlot(final SlotID slotId) {
-               final ResourceID resourceId = slotId.getResourceID();
-               if (!registeredSlots.containsKey(resourceId)) {
-                       return null;
-               }
-               return registeredSlots.get(resourceId).get(slotId);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Framework specific behavior
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Choose a slot to use among all free slots, the behavior is framework 
specified.
-        *
-        * @param request   The slot request
-        * @param freeSlots All slots which can be used
-        * @return The slot we choose to use, <tt>null</tt> if we did not find 
a match
-        */
-       protected abstract ResourceSlot chooseSlotToUse(final SlotRequest 
request,
-               final Map<SlotID, ResourceSlot> freeSlots);
-
-       /**
-        * Choose a pending request to fulfill when we have a free slot, the 
behavior is framework specified.
-        *
-        * @param offeredSlot     The free slot
-        * @param pendingRequests All the pending slot requests
-        * @return The chosen SlotRequest, <tt>null</tt> if we did not find a 
match
-        */
-       protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
-               final Map<AllocationID, SlotRequest> pendingRequests);
-
-       /**
-        * The framework specific code for allocating a container for specified 
resource profile.
-        *
-        * @param resourceProfile The resource profile
-        */
-       protected abstract void allocateContainer(final ResourceProfile 
resourceProfile);
-
-
-       // 
------------------------------------------------------------------------
-       //  Helper classes
-       // 
------------------------------------------------------------------------
-
-       /**
-        * We maintain all the allocations with SlotID and AllocationID. We are 
able to get or remove the allocation info
-        * either by SlotID or AllocationID.
-        */
-       private static class AllocationMap {
-
-               /** All allocated slots (by SlotID) */
-               private final Map<SlotID, AllocationID> allocatedSlots;
-
-               /** All allocated slots (by AllocationID), it'a a inverse view 
of allocatedSlots */
-               private final Map<AllocationID, SlotID> 
allocatedSlotsByAllocationId;
-
-               AllocationMap() {
-                       this.allocatedSlots = new HashMap<>(16);
-                       this.allocatedSlotsByAllocationId = new HashMap<>(16);
-               }
-
-               /**
-                * Add a allocation
-                *
-                * @param slotId       The slot id
-                * @param allocationId The allocation id
-                */
-               void addAllocation(final SlotID slotId, final AllocationID 
allocationId) {
-                       allocatedSlots.put(slotId, allocationId);
-                       allocatedSlotsByAllocationId.put(allocationId, slotId);
-               }
-
-               /**
-                * De-allocation with slot id
-                *
-                * @param slotId The slot id
-                */
-               void removeAllocation(final SlotID slotId) {
-                       if (allocatedSlots.containsKey(slotId)) {
-                               final AllocationID allocationId = 
allocatedSlots.get(slotId);
-                               allocatedSlots.remove(slotId);
-                               
allocatedSlotsByAllocationId.remove(allocationId);
-                       }
-               }
-
-               /**
-                * De-allocation with allocation id
-                *
-                * @param allocationId The allocation id
-                */
-               void removeAllocation(final AllocationID allocationId) {
-                       if 
(allocatedSlotsByAllocationId.containsKey(allocationId)) {
-                               SlotID slotId = 
allocatedSlotsByAllocationId.get(allocationId);
-                               
allocatedSlotsByAllocationId.remove(allocationId);
-                               allocatedSlots.remove(slotId);
-                       }
-               }
-
-               /**
-                * Check whether allocation exists by slot id
-                *
-                * @param slotId The slot id
-                * @return true if the allocation exists
-                */
-               boolean isAllocated(final SlotID slotId) {
-                       return allocatedSlots.containsKey(slotId);
-               }
-
-               /**
-                * Check whether allocation exists by allocation id
-                *
-                * @param allocationId The allocation id
-                * @return true if the allocation exists
-                */
-               boolean isAllocated(final AllocationID allocationId) {
-                       return 
allocatedSlotsByAllocationId.containsKey(allocationId);
-               }
-
-               AllocationID getAllocationID(final SlotID slotId) {
-                       return allocatedSlots.get(slotId);
-               }
-
-               SlotID getSlotID(final AllocationID allocationId) {
-                       return allocatedSlotsByAllocationId.get(allocationId);
-               }
-
-               public int size() {
-                       return allocatedSlots.size();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Testing utilities
-       // 
------------------------------------------------------------------------
-
-       @VisibleForTesting
-       boolean isAllocated(final SlotID slotId) {
-               return allocationMap.isAllocated(slotId);
-       }
-
-       @VisibleForTesting
-       boolean isAllocated(final AllocationID allocationId) {
-               return allocationMap.isAllocated(allocationId);
-       }
-
-       /**
-        * Add free slots directly to the free pool, this will not trigger 
pending requests allocation
-        *
-        * @param slot The resource slot
-        */
-       @VisibleForTesting
-       void addFreeSlot(final ResourceSlot slot) {
-               final ResourceID resourceId = slot.getResourceID();
-               final SlotID slotId = slot.getSlotId();
-
-               if (!registeredSlots.containsKey(resourceId)) {
-                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
-               }
-               registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
-               freeSlots.put(slotId, slot);
-       }
-
-       @VisibleForTesting
-       int getAllocatedSlotCount() {
-               return allocationMap.size();
-       }
-
-       @VisibleForTesting
-       int getFreeSlotCount() {
-               return freeSlots.size();
-       }
-
-       @VisibleForTesting
-       int getPendingRequestCount() {
-               return pendingSlotRequests.size();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
new file mode 100644
index 0000000..6b7f6dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class SlotRequestRegistered extends SlotRequestReply {
+
+       public SlotRequestRegistered(AllocationID allocationID) {
+               super(allocationID);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
new file mode 100644
index 0000000..cb3ec72
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Rejection message by the ResourceManager for a SlotRequest from the 
JobManager
+ */
+public class SlotRequestRejected extends SlotRequestReply {
+
+       public SlotRequestRejected(AllocationID allocationID) {
+               super(allocationID);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
new file mode 100644
index 0000000..1b85d0c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public abstract class SlotRequestReply implements Serializable {
+
+       private static final long serialVersionUID = 42;
+
+       private final AllocationID allocationID;
+
+       public SlotRequestReply(AllocationID allocationID) {
+               this.allocationID = allocationID;
+       }
+
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
new file mode 100644
index 0000000..ef5ce31
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A simple SlotManager which ignores resource profiles.
+ */
+public class SimpleSlotManager extends SlotManager {
+
+       @Override
+       protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, 
ResourceSlot> freeSlots) {
+               final Iterator<ResourceSlot> slotIterator = 
freeSlots.values().iterator();
+               if (slotIterator.hasNext()) {
+                       return slotIterator.next();
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, 
Map<AllocationID, SlotRequest> pendingRequests) {
+               final Iterator<SlotRequest> requestIterator = 
pendingRequests.values().iterator();
+               if (requestIterator.hasNext()) {
+                       return requestIterator.next();
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       protected void allocateContainer(ResourceProfile resourceProfile) {
+               // TODO
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
new file mode 100644
index 0000000..96fde7d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * SlotManager is responsible for receiving slot requests and do slot 
allocations. It allows to request
+ * slots from registered TaskManagers and issues container allocation requests 
in case of there are not
+ * enough available slots. Besides, it should sync its slot allocation with 
TaskManager's heartbeat.
+ * <p>
+ * The main operation principle of SlotManager is:
+ * <ul>
+ * <li>1. All slot allocation status should be synced with TaskManager, which 
is the ground truth.</li>
+ * <li>2. All slots that have registered must be tracked, either by free pool 
or allocated pool.</li>
+ * <li>3. All slot requests will be handled by best efforts, there is no 
guarantee that one request will be
+ * fulfilled in time or correctly allocated. Conflicts or timeout or some 
special error will happen, it should
+ * be handled outside SlotManager. SlotManager will make each decision based 
on the information it currently
+ * holds.</li>
+ * </ul>
+ * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
+ */
+public abstract class SlotManager implements LeaderRetrievalListener {
+
+       protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+       /** All registered task managers with ResourceID and gateway. */
+       private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways;
+
+       /** All registered slots, including free and allocated slots */
+       private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
+
+       /** All pending slot requests, waiting available slots to fulfil */
+       private final Map<AllocationID, SlotRequest> pendingSlotRequests;
+
+       /** All free slots that can be used to be allocated */
+       private final Map<SlotID, ResourceSlot> freeSlots;
+
+       /** All allocations, we can lookup allocations either by SlotID or 
AllocationID */
+       private final AllocationMap allocationMap;
+
+       private final FiniteDuration timeout;
+
+       /** The current leader id set by the ResourceManager */
+       private UUID leaderID;
+
+       public SlotManager() {
+               this.registeredSlots = new HashMap<>(16);
+               this.pendingSlotRequests = new LinkedHashMap<>(16);
+               this.freeSlots = new HashMap<>(16);
+               this.allocationMap = new AllocationMap();
+               this.taskManagerGateways = new HashMap<>();
+               this.timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+       }
+
+
+       // 
------------------------------------------------------------------------
+       //  slot managements
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Request a slot with requirements, we may either fulfill the request 
or pending it. Trigger container
+        * allocation if we don't have enough resource. If we have free slot 
which can match the request, record
+        * this allocation and forward the request to TaskManager through 
ResourceManager (we want this done by
+        * RPC's main thread to avoid race condition).
+        *
+        * @param request The detailed request of the slot
+        * @return SlotRequestRegistered The confirmation message to be send to 
the caller
+        */
+       public SlotRequestRegistered requestSlot(final SlotRequest request) {
+               final AllocationID allocationId = request.getAllocationId();
+               if (isRequestDuplicated(request)) {
+                       LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
+                       return null;
+               }
+
+               // try to fulfil the request with current free slots
+               final ResourceSlot slot = chooseSlotToUse(request, freeSlots);
+               if (slot != null) {
+                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", slot.getSlotId(),
+                               allocationId, request.getJobId());
+
+                       // record this allocation in bookkeeping
+                       allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
+
+                       // remove selected slot from free pool
+                       freeSlots.remove(slot.getSlotId());
+
+                       final Future<SlotRequestReply> slotRequestReplyFuture =
+                               
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+                       // TODO handle timeouts and response
+               } else {
+                       LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
+                               "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
+                       allocateContainer(request.getResourceProfile());
+                       pendingSlotRequests.put(allocationId, request);
+               }
+
+               return new SlotRequestRegistered(allocationId);
+       }
+
+       /**
+        * Sync slot status with TaskManager's SlotReport.
+        */
+       public void updateSlotStatus(final SlotReport slotReport) {
+               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+                       updateSlotStatus(slotStatus);
+               }
+       }
+
+       /**
+        * Registers a TaskExecutor
+        * @param resourceID TaskExecutor's ResourceID
+        * @param gateway TaskExcutor's gateway
+        */
+       public void registerTaskExecutor(ResourceID resourceID, 
TaskExecutorGateway gateway) {
+               this.taskManagerGateways.put(resourceID, gateway);
+       }
+
+       /**
+        * The slot request to TaskManager may be either failed by rpc 
communication (timeout, network error, etc.)
+        * or really rejected by TaskManager. We shall retry this request by:
+        * <ul>
+        * <li>1. verify and clear all the previous allocate information for 
this request
+        * <li>2. try to request slot again
+        * </ul>
+        * <p>
+        * This may cause some duplicate allocation, e.g. the slot request to 
TaskManager is successful but the response
+        * is lost somehow, so we may request a slot in another TaskManager, 
this causes two slots assigned to one request,
+        * but it can be taken care of by rejecting registration at JobManager.
+        *
+        * @param originalRequest The original slot request
+        * @param slotId          The target SlotID
+        */
+       public void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
+               final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
+               LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
+                       slotId, originalAllocationId, 
originalRequest.getJobId());
+
+               // verify the allocation info before we do anything
+               if (freeSlots.containsKey(slotId)) {
+                       // this slot is currently empty, no need to de-allocate 
it from our allocations
+                       LOG.info("Original slot is somehow empty, retrying this 
request");
+
+                       // before retry, we should double check whether this 
request was allocated by some other ways
+                       if (!allocationMap.isAllocated(originalAllocationId)) {
+                               requestSlot(originalRequest);
+                       } else {
+                               LOG.info("The failed request has somehow been 
allocated, SlotID:{}",
+                                       
allocationMap.getSlotID(originalAllocationId));
+                       }
+               } else if (allocationMap.isAllocated(slotId)) {
+                       final AllocationID currentAllocationId = 
allocationMap.getAllocationID(slotId);
+
+                       // check whether we have an agreement on whom this slot 
belongs to
+                       if (originalAllocationId.equals(currentAllocationId)) {
+                               LOG.info("De-allocate this request and retry");
+                               
allocationMap.removeAllocation(currentAllocationId);
+
+                               // put this slot back to free pool
+                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
+                               freeSlots.put(slotId, slot);
+
+                               // retry the request
+                               requestSlot(originalRequest);
+                       } else {
+                               // the slot is taken by someone else, no need 
to de-allocate it from our allocations
+                               LOG.info("Original slot is taken by someone 
else, current AllocationID:{}", currentAllocationId);
+
+                               // before retry, we should double check whether 
this request was allocated by some other ways
+                               if 
(!allocationMap.isAllocated(originalAllocationId)) {
+                                       requestSlot(originalRequest);
+                               } else {
+                                       LOG.info("The failed request is somehow 
been allocated, SlotID:{}",
+                                               
allocationMap.getSlotID(originalAllocationId));
+                               }
+                       }
+               } else {
+                       LOG.error("BUG! {} is neither in free pool nor in 
allocated pool", slotId);
+               }
+       }
+
+       /**
+        * Callback for TaskManager failures. In case that a TaskManager fails, 
we have to clean up all its slots.
+        *
+        * @param resourceId The ResourceID of the TaskManager
+        */
+       public void notifyTaskManagerFailure(final ResourceID resourceId) {
+               LOG.info("Resource:{} been notified failure", resourceId);
+               taskManagerGateways.remove(resourceId);
+               final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
+               if (slotIdsToRemove != null) {
+                       for (SlotID slotId : slotIdsToRemove.keySet()) {
+                               LOG.info("Removing Slot: {} upon resource 
failure", slotId);
+                               if (freeSlots.containsKey(slotId)) {
+                                       freeSlots.remove(slotId);
+                               } else if (allocationMap.isAllocated(slotId)) {
+                                       allocationMap.removeAllocation(slotId);
+                               } else {
+                                       LOG.error("BUG! {} is neither in free 
pool nor in allocated pool", slotId);
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  internal behaviors
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Update slot status based on TaskManager's report. There are mainly 
two situations when we receive the report:
+        * <ul>
+        * <li>1. The slot is newly registered.</li>
+        * <li>2. The slot has registered, it contains its current status.</li>
+        * </ul>
+        * <p>
+        * Regarding 1: It's fairly simple, we just record this slot's status, 
and trigger schedule if slot is empty.
+        * <p>
+        * Regarding 2: It will cause some weird situation since we may have 
some time-gap on how the slot's status really
+        * is. We may have some updates on the slot's allocation, but it 
doesn't reflected by TaskManager's heartbeat yet,
+        * and we may make some wrong decision if we cannot guarantee we have 
the exact status about all the slots. So
+        * the principle here is: We always trust TaskManager's heartbeat, we 
will correct our information based on that
+        * and take next action based on the diff between our information and 
heartbeat status.
+        *
+        * @param reportedStatus Reported slot status
+        */
+       void updateSlotStatus(final SlotStatus reportedStatus) {
+               final SlotID slotId = reportedStatus.getSlotID();
+
+               final TaskExecutorGateway taskExecutorGateway = 
taskManagerGateways.get(slotId.getResourceID());
+               if (taskExecutorGateway == null) {
+                       LOG.info("Received SlotStatus but ResourceID {} is 
unknown to the SlotManager",
+                               slotId.getResourceID());
+                       return;
+               }
+
+               final ResourceSlot slot = new ResourceSlot(slotId, 
reportedStatus.getProfiler(), taskExecutorGateway);
+
+               if (registerNewSlot(slot)) {
+                       // we have a newly registered slot
+                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, reportedStatus.getAllocationID());
+
+                       if (reportedStatus.getAllocationID() != null) {
+                               // slot in use, record this in bookkeeping
+                               allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
+                       } else {
+                               handleFreeSlot(slot);
+                       }
+               } else {
+                       // slot exists, update current information
+                       if (reportedStatus.getAllocationID() != null) {
+                               // slot is reported in use
+                               final AllocationID reportedAllocationId = 
reportedStatus.getAllocationID();
+
+                               // check whether we also thought this slot is 
in use
+                               if (allocationMap.isAllocated(slotId)) {
+                                       // we also think that slot is in use, 
check whether the AllocationID matches
+                                       final AllocationID currentAllocationId 
= allocationMap.getAllocationID(slotId);
+
+                                       if 
(!reportedAllocationId.equals(currentAllocationId)) {
+                                               LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:{}",
+                                                       slotId, 
currentAllocationId, reportedAllocationId);
+
+                                               // seems we have a disagreement 
about the slot assignments, need to correct it
+                                               
allocationMap.removeAllocation(slotId);
+                                               
allocationMap.addAllocation(slotId, reportedAllocationId);
+                                       }
+                               } else {
+                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:null, reported:{}",
+                                               slotId, reportedAllocationId);
+
+                                       // we thought the slot is free, should 
correct this information
+                                       allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
+
+                                       // remove this slot from free slots pool
+                                       freeSlots.remove(slotId);
+                               }
+                       } else {
+                               // slot is reported empty
+
+                               // check whether we also thought this slot is 
empty
+                               if (allocationMap.isAllocated(slotId)) {
+                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
+                                               slotId, 
allocationMap.getAllocationID(slotId));
+
+                                       // we thought the slot is in use, 
correct it
+                                       allocationMap.removeAllocation(slotId);
+
+                                       // we have a free slot!
+                                       handleFreeSlot(slot);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
+        * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
+        * to the free pool.
+        *
+        * @param freeSlot The free slot
+        */
+       private void handleFreeSlot(final ResourceSlot freeSlot) {
+               SlotRequest chosenRequest = chooseRequestToFulfill(freeSlot, 
pendingSlotRequests);
+
+               if (chosenRequest != null) {
+                       final AllocationID allocationId = 
chosenRequest.getAllocationId();
+                       pendingSlotRequests.remove(allocationId);
+
+                       LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
+                               allocationId, chosenRequest.getJobId());
+                       allocationMap.addAllocation(freeSlot.getSlotId(), 
allocationId);
+
+                       final Future<SlotRequestReply> slotRequestReplyFuture =
+                               
freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
+                       // TODO handle timeouts and response
+               } else {
+                       freeSlots.put(freeSlot.getSlotId(), freeSlot);
+               }
+       }
+
+       /**
+        * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
+        * formerly received slot request, it is either in pending list or 
already been allocated.
+        *
+        * @param request The slot request
+        * @return <tt>true</tt> if the request is duplicated
+        */
+       private boolean isRequestDuplicated(final SlotRequest request) {
+               final AllocationID allocationId = request.getAllocationId();
+               return pendingSlotRequests.containsKey(allocationId)
+                       || allocationMap.isAllocated(allocationId);
+       }
+
+       /**
+        * Try to register slot, and tell if this slot is newly registered.
+        *
+        * @param slot The ResourceSlot which will be checked and registered
+        * @return <tt>true</tt> if we meet a new slot
+        */
+       private boolean registerNewSlot(final ResourceSlot slot) {
+               final SlotID slotId = slot.getSlotId();
+               final ResourceID resourceId = slotId.getResourceID();
+               if (!registeredSlots.containsKey(resourceId)) {
+                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+               }
+               return registeredSlots.get(resourceId).put(slotId, slot) == 
null;
+       }
+
+       private ResourceSlot getRegisteredSlot(final SlotID slotId) {
+               final ResourceID resourceId = slotId.getResourceID();
+               if (!registeredSlots.containsKey(resourceId)) {
+                       return null;
+               }
+               return registeredSlots.get(resourceId).get(slotId);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Framework specific behavior
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Choose a slot to use among all free slots, the behavior is framework 
specified.
+        *
+        * @param request   The slot request
+        * @param freeSlots All slots which can be used
+        * @return The slot we choose to use, <tt>null</tt> if we did not find 
a match
+        */
+       protected abstract ResourceSlot chooseSlotToUse(final SlotRequest 
request,
+               final Map<SlotID, ResourceSlot> freeSlots);
+
+       /**
+        * Choose a pending request to fulfill when we have a free slot, the 
behavior is framework specified.
+        *
+        * @param offeredSlot     The free slot
+        * @param pendingRequests All the pending slot requests
+        * @return The chosen SlotRequest, <tt>null</tt> if we did not find a 
match
+        */
+       protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
+               final Map<AllocationID, SlotRequest> pendingRequests);
+
+       /**
+        * The framework specific code for allocating a container for specified 
resource profile.
+        *
+        * @param resourceProfile The resource profile
+        */
+       protected abstract void allocateContainer(final ResourceProfile 
resourceProfile);
+
+       // 
------------------------------------------------------------------------
+       //  Helper classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * We maintain all the allocations with SlotID and AllocationID. We are 
able to get or remove the allocation info
+        * either by SlotID or AllocationID.
+        */
+       private static class AllocationMap {
+
+               /** All allocated slots (by SlotID) */
+               private final Map<SlotID, AllocationID> allocatedSlots;
+
+               /** All allocated slots (by AllocationID), it'a a inverse view 
of allocatedSlots */
+               private final Map<AllocationID, SlotID> 
allocatedSlotsByAllocationId;
+
+               AllocationMap() {
+                       this.allocatedSlots = new HashMap<>(16);
+                       this.allocatedSlotsByAllocationId = new HashMap<>(16);
+               }
+
+               /**
+                * Add a allocation
+                *
+                * @param slotId       The slot id
+                * @param allocationId The allocation id
+                */
+               void addAllocation(final SlotID slotId, final AllocationID 
allocationId) {
+                       allocatedSlots.put(slotId, allocationId);
+                       allocatedSlotsByAllocationId.put(allocationId, slotId);
+               }
+
+               /**
+                * De-allocation with slot id
+                *
+                * @param slotId The slot id
+                */
+               void removeAllocation(final SlotID slotId) {
+                       if (allocatedSlots.containsKey(slotId)) {
+                               final AllocationID allocationId = 
allocatedSlots.get(slotId);
+                               allocatedSlots.remove(slotId);
+                               
allocatedSlotsByAllocationId.remove(allocationId);
+                       }
+               }
+
+               /**
+                * De-allocation with allocation id
+                *
+                * @param allocationId The allocation id
+                */
+               void removeAllocation(final AllocationID allocationId) {
+                       if 
(allocatedSlotsByAllocationId.containsKey(allocationId)) {
+                               SlotID slotId = 
allocatedSlotsByAllocationId.get(allocationId);
+                               
allocatedSlotsByAllocationId.remove(allocationId);
+                               allocatedSlots.remove(slotId);
+                       }
+               }
+
+               /**
+                * Check whether allocation exists by slot id
+                *
+                * @param slotId The slot id
+                * @return true if the allocation exists
+                */
+               boolean isAllocated(final SlotID slotId) {
+                       return allocatedSlots.containsKey(slotId);
+               }
+
+               /**
+                * Check whether allocation exists by allocation id
+                *
+                * @param allocationId The allocation id
+                * @return true if the allocation exists
+                */
+               boolean isAllocated(final AllocationID allocationId) {
+                       return 
allocatedSlotsByAllocationId.containsKey(allocationId);
+               }
+
+               AllocationID getAllocationID(final SlotID slotId) {
+                       return allocatedSlots.get(slotId);
+               }
+
+               SlotID getSlotID(final AllocationID allocationId) {
+                       return allocatedSlotsByAllocationId.get(allocationId);
+               }
+
+               public int size() {
+                       return allocatedSlots.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  High availability
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
+               this.leaderID = leaderSessionID;
+       }
+
+       @Override
+       public void handleError(Exception exception) {
+               LOG.error("Slot Manager received an error from the leader 
service", exception);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Testing utilities
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       boolean isAllocated(final SlotID slotId) {
+               return allocationMap.isAllocated(slotId);
+       }
+
+       @VisibleForTesting
+       boolean isAllocated(final AllocationID allocationId) {
+               return allocationMap.isAllocated(allocationId);
+       }
+
+       /**
+        * Add free slots directly to the free pool, this will not trigger 
pending requests allocation
+        *
+        * @param slot The resource slot
+        */
+       @VisibleForTesting
+       void addFreeSlot(final ResourceSlot slot) {
+               final ResourceID resourceId = slot.getResourceID();
+               final SlotID slotId = slot.getSlotId();
+
+               if (!registeredSlots.containsKey(resourceId)) {
+                       registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
+               }
+               registeredSlots.get(resourceId).put(slot.getSlotId(), slot);
+               freeSlots.put(slotId, slot);
+       }
+
+       @VisibleForTesting
+       int getAllocatedSlotCount() {
+               return allocationMap.size();
+       }
+
+       @VisibleForTesting
+       int getFreeSlotCount() {
+               return freeSlots.size();
+       }
+
+       @VisibleForTesting
+       int getPendingRequestCount() {
+               return pendingSlotRequests.size();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
index 744b674..0f57bb1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java
@@ -50,7 +50,10 @@ public class SlotStatus implements Serializable {
                this(slotID, profiler, null, null);
        }
 
-       public SlotStatus(SlotID slotID, ResourceProfile profiler, AllocationID 
allocationID, JobID jobID) {
+       public SlotStatus(
+                       SlotID slotID, ResourceProfile profiler,
+                       JobID jobID,
+                       AllocationID allocationID) {
                this.slotID = checkNotNull(slotID, "slotID cannot be null");
                this.profiler = checkNotNull(profiler, "profile cannot be 
null");
                this.allocationID = allocationID;

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 6c99706..7257436 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,7 +18,12 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
 
@@ -32,4 +37,16 @@ public interface TaskExecutorGateway extends RpcGateway {
        // 
------------------------------------------------------------------------
 
        void notifyOfNewResourceManagerLeader(String address, UUID 
resourceManagerLeaderId);
+
+       /**
+        * Send by the ResourceManager to the TaskExecutor
+        * @param allocationID id for the request
+        * @param resourceManagerLeaderID current leader id of the 
ResourceManager
+        * @return SlotRequestReply Answer to the request
+        */
+
+       Future<SlotRequestReply> requestSlot(
+               AllocationID allocationID,
+               UUID resourceManagerLeaderID,
+               @RpcTimeout FiniteDuration timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8a48fa50/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 5799e62..8183c0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -53,7 +54,8 @@ public class ResourceManagerHATest {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-               final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
+               SlotManager slotManager = mock(SlotManager.class);
+               final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices, slotManager);
                resourceManager.start();
                // before grant leadership, resourceManager's leaderId is null
                Assert.assertNull(resourceManager.getLeaderSessionID());

Reply via email to