[FLINK-4347][FLINK-4348] simplify SlotManager and integrate it with 
ResourceManager

Instead of relying on a full synchronization of all slots information on
every heartbeat, the SlotManager is now responsible for updating its
state. It initially syncs all slots upon registration of the
TaskExecutor. After that, it only receives notifications from the
TaskExecutor when slots become available again. This simplifies the
logic of the SlotManager and makes the slot allocation more predictable
in case of message loss.

Additional changes:

- Move the slot registration and allocation report to the registration
  of the TaskExecutor

- Let the TaskExecutor immediately notify the ResourceManager once a
  slot becomes free. The ResourceManager has to confirm this
  notification. Otherwise, the slot will be blocked because the
  ResourceManager's state is not in sync.

- Integrate with handleSlotRequestFailedAtTaskManager and introduce
  fencing to protect against TaskExecutors which are not registered
  anymore.

- introduce RPC call to notify ResourceManager about free slots

- ignore out-of-date slot requests from ResourceManager at TaskExecutor

- let the ResourceManager update its state instead of relying on heartbeats

- provide ResourceManagerServices to SlotManager

- introduce factory for SlotManager

- keep task gateways and worker information in ResourceManager and
  inform SlotManager

- add TaskExecutor test to ensure that a free slot which hasn't been
  confirmed by the task executor is correctly blacklisted as long as the
  ResourceManager has not confirmed the allocation removal.

- adapt tests

- update javadocs

This closes #2571.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0518af03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0518af03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0518af03

Branch: refs/heads/flip-6
Commit: 0518af03a8931ed9b2da013c5ed4da7065156ee0
Parents: 85424c1
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Sep 29 15:08:32 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Fri Oct 7 15:51:14 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/ResourceSlot.java    |  12 +-
 .../resourcemanager/ResourceManager.java        | 115 +++++--
 .../resourcemanager/ResourceManagerGateway.java |  31 +-
 .../ResourceManagerServices.java                |   2 +-
 .../resourcemanager/SlotRequestRegistered.java  |  33 --
 .../resourcemanager/SlotRequestRejected.java    |  34 --
 .../resourcemanager/SlotRequestReply.java       |  41 ---
 .../StandaloneResourceManager.java              |   8 +-
 .../jobmanager/RMSlotRequestRegistered.java     |  33 ++
 .../jobmanager/RMSlotRequestRejected.java       |  34 ++
 .../messages/jobmanager/RMSlotRequestReply.java |  41 +++
 .../taskexecutor/SlotAvailableReply.java        |  47 +++
 .../taskexecutor/TMSlotRequestRegistered.java   |  35 ++
 .../taskexecutor/TMSlotRequestRejected.java     |  35 ++
 .../taskexecutor/TMSlotRequestReply.java        |  58 ++++
 .../registration/TaskExecutorRegistration.java  |  12 +-
 .../registration/WorkerRegistration.java        |  42 +++
 .../slotmanager/SimpleSlotManager.java          |  53 ---
 .../slotmanager/SlotManager.java                | 326 +++++++------------
 .../slotmanager/SlotManagerFactory.java         |  31 ++
 .../flink/runtime/taskexecutor/SlotReport.java  |  19 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  37 ++-
 .../taskexecutor/TaskExecutorGateway.java       |   7 +-
 ...TaskExecutorToResourceManagerConnection.java |   2 +-
 .../resourcemanager/ResourceManagerHATest.java  |  12 +-
 .../ResourceManagerJobMasterTest.java           |   4 +-
 .../ResourceManagerTaskExecutorTest.java        |  53 +--
 .../resourcemanager/TestingResourceManager.java |  53 +++
 .../resourcemanager/TestingSlotManager.java     |  78 +++++
 .../slotmanager/SlotManagerTest.java            | 239 ++++++--------
 .../slotmanager/SlotProtocolTest.java           |  92 ++++--
 .../runtime/taskexecutor/TaskExecutorTest.java  |  96 +++++-
 32 files changed, 1087 insertions(+), 628 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
index 4a91a79..0b9367d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -35,12 +35,12 @@ public class ResourceSlot implements ResourceIDRetrievable {
        private final ResourceProfile resourceProfile;
 
        /** Gateway to the TaskExecutor which owns the slot */
-       private final TaskExecutorGateway taskExecutorGateway;
+       private final TaskExecutorRegistration taskExecutorRegistration;
 
-       public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, 
TaskExecutorGateway taskExecutorGateway) {
+       public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, 
TaskExecutorRegistration taskExecutorRegistration) {
                this.slotId = checkNotNull(slotId);
                this.resourceProfile = checkNotNull(resourceProfile);
-               this.taskExecutorGateway = taskExecutorGateway;
+               this.taskExecutorRegistration = 
checkNotNull(taskExecutorRegistration);
        }
 
        @Override
@@ -56,8 +56,8 @@ public class ResourceSlot implements ResourceIDRetrievable {
                return resourceProfile;
        }
 
-       public TaskExecutorGateway getTaskExecutorGateway() {
-               return taskExecutorGateway;
+       public TaskExecutorRegistration getTaskExecutorRegistration() {
+               return taskExecutorRegistration;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f45afa3..d2d00cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -37,7 +37,12 @@ import 
org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
+import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -45,12 +50,14 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -70,41 +77,54 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from 
the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
+public abstract class ResourceManager<WorkerType extends Serializable>
                extends RpcEndpoint<ResourceManagerGateway>
                implements LeaderContender {
 
-       /** The exit code with which the process is stopped in case of a fatal 
error */
+       /** The exit code with which the process is stopped in case of a fatal 
error. */
        protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
+       /** All currently registered JobMasterGateways scoped by JobID. */
        private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
+       /** LeaderListeners for all registered JobMasters. */
        private final Map<JobID, JobMasterLeaderListener> 
jobMasterLeaderRetrievalListeners;
 
-       private final Map<ResourceID, WorkerType> taskExecutorGateways;
+       /** All currently registered TaskExecutors with there framework 
specific worker information. */
+       private final Map<ResourceID, WorkerRegistration<WorkerType>> 
taskExecutors;
 
+       /** High availability services for leader retrieval and election. */
        private final HighAvailabilityServices highAvailabilityServices;
 
-       private final SlotManager slotManager;
+       /** The factory to construct the SlotManager. */
+       private final SlotManagerFactory slotManagerFactory;
 
+       /** The SlotManager created by the slotManagerFactory when the 
ResourceManager is started. */
+       private SlotManager slotManager;
+
+       /** The service to elect a ResourceManager leader. */
        private LeaderElectionService leaderElectionService;
 
+       /** ResourceManager's leader session id which is updated on leader 
election. */
        private UUID leaderSessionID;
 
+       /** All registered listeners for status updates of the ResourceManager. 
*/
        private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
+       /** Default timeout for messages */
        private final Time timeout = Time.seconds(5);
 
        public ResourceManager(
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
-                       SlotManager slotManager) {
+                       SlotManagerFactory slotManagerFactory) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
+               this.slotManagerFactory = checkNotNull(slotManagerFactory);
                this.jobMasterGateways = new HashMap<>();
-               this.slotManager = checkNotNull(slotManager);
                this.jobMasterLeaderRetrievalListeners = new HashMap<>();
-               this.taskExecutorGateways = new HashMap<>();
+               this.taskExecutors = new HashMap<>();
+               this.leaderSessionID = new UUID(0, 0);
                infoMessageListeners = new HashMap<>();
        }
 
@@ -113,9 +133,10 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                // start a leader
                try {
                        super.start();
+                       // SlotManager should start first
+                       slotManager = 
slotManagerFactory.create(createResourceManagerServices());
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
                        leaderElectionService.start(this);
-                       slotManager.setupResourceManagerServices(new 
DefaultResourceManagerServices());
                        // framework specific initialization
                        initialize();
                } catch (Throwable e) {
@@ -196,7 +217,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        .handleAsync(new BiFunction<JobMasterGateway, 
Throwable, RegistrationResponse>() {
                                @Override
                                public RegistrationResponse 
apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
-                                       
+
                                        if (throwable != null) {
                                                return new 
RegistrationResponse.Decline(throwable.getMessage());
                                        } else {
@@ -234,7 +255,8 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        public Future<RegistrationResponse> registerTaskExecutor(
                final UUID resourceManagerLeaderId,
                final String taskExecutorAddress,
-               final ResourceID resourceID) {
+               final ResourceID resourceID,
+               final SlotReport slotReport) {
 
                return getRpcService().execute(new 
Callable<TaskExecutorGateway>() {
                        @Override
@@ -245,7 +267,8 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                                resourceID, 
taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
                                        throw new Exception("Invalid leader 
session id");
                                }
-                               return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, 
TimeUnit.SECONDS);
+                               return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class)
+                                       .get(timeout.toMilliseconds(), 
timeout.getUnit());
                        }
                }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, 
RegistrationResponse>() {
                        @Override
@@ -253,14 +276,17 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                                if (throwable != null) {
                                        return new 
RegistrationResponse.Decline(throwable.getMessage());
                                } else {
-                                       WorkerType oldWorker = 
taskExecutorGateways.remove(resourceID);
-                                       if (oldWorker != null) {
+                                       WorkerRegistration oldRegistration = 
taskExecutors.remove(resourceID);
+                                       if (oldRegistration != null) {
                                                // TODO :: suggest old 
taskExecutor to stop itself
-                                               
slotManager.notifyTaskManagerFailure(resourceID);
+                                               log.info("Replacing old 
instance of worker for ResourceID {}", resourceID);
                                        }
                                        WorkerType newWorker = 
workerStarted(resourceID);
-                                       taskExecutorGateways.put(resourceID, 
newWorker);
-                                       return new 
TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
+                                       WorkerRegistration<WorkerType> 
registration =
+                                               new 
WorkerRegistration<>(taskExecutorGateway, newWorker);
+                                       taskExecutors.put(resourceID, 
registration);
+                                       
slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+                                       return new 
TaskExecutorRegistrationSuccess(registration.getInstanceID(), 5000);
                                }
                        }
                }, getMainThreadExecutor());
@@ -273,7 +299,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         * @return Slot assignment
         */
        @RpcMethod
-       public SlotRequestReply requestSlot(
+       public RMSlotRequestReply requestSlot(
                        UUID jobMasterLeaderID,
                        UUID resourceManagerLeaderID,
                        SlotRequest slotRequest) {
@@ -290,8 +316,41 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        return slotManager.requestSlot(slotRequest);
                } else {
                        log.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
-                       return new 
SlotRequestRejected(slotRequest.getAllocationId());
+                       return new 
RMSlotRequestRejected(slotRequest.getAllocationId());
+               }
+       }
+
+       /**
+        * Notification from a TaskExecutor that a slot has become available
+        * @param resourceManagerLeaderId TaskExecutor's resource manager 
leader id
+        * @param resourceID TaskExecutor's resource id
+        * @param instanceID TaskExecutor's instance id
+        * @param slotID The slot id of the available slot
+        * @return SlotAvailableReply
+        */
+       @RpcMethod
+       public SlotAvailableReply notifySlotAvailable(
+                       final UUID resourceManagerLeaderId,
+                       final ResourceID resourceID,
+                       final InstanceID instanceID,
+                       final SlotID slotID) {
+
+               if (resourceManagerLeaderId.equals(leaderSessionID)) {
+                       WorkerRegistration<WorkerType> registration = 
taskExecutors.get(resourceID);
+                       if (registration != null) {
+                               InstanceID registrationInstanceID = 
registration.getInstanceID();
+                               if (registrationInstanceID.equals(instanceID)) {
+                                       runAsync(new Runnable() {
+                                               @Override
+                                               public void run() {
+                                                       
slotManager.notifySlotAvailable(resourceID, slotID);
+                                               }
+                                       });
+                                       return new 
SlotAvailableReply(leaderSessionID, slotID);
+                               }
+                       }
                }
+               return null;
        }
 
 
@@ -329,9 +388,9 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        public void run() {
                                log.info("ResourceManager {} was revoked 
leadership.", getAddress());
                                jobMasterGateways.clear();
-                               taskExecutorGateways.clear();
+                               taskExecutors.clear();
                                slotManager.clearState();
-                               leaderSessionID = null;
+                               leaderSessionID = new UUID(0, 0);
                        }
                });
        }
@@ -411,7 +470,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                runAsync(new Runnable() {
                        @Override
                        public void run() {
-                               WorkerType worker = 
taskExecutorGateways.remove(resourceID);
+                               WorkerType worker = 
taskExecutors.remove(resourceID).getWorker();
                                if (worker != null) {
                                        // TODO :: suggest failed task executor 
to stop itself
                                        
slotManager.notifyTaskManagerFailure(resourceID);
@@ -426,7 +485,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         * @return The number of currently started TaskManagers.
         */
        public int getNumberOfStartedTaskManagers() {
-               return taskExecutorGateways.size();
+               return taskExecutors.size();
        }
 
        /**
@@ -507,6 +566,14 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                });
        }
 
+       // 
------------------------------------------------------------------------
+       //  Resource Manager Services
+       // 
------------------------------------------------------------------------
+
+       protected ResourceManagerServices createResourceManagerServices() {
+               return new DefaultResourceManagerServices();
+       }
+
        private class DefaultResourceManagerServices implements 
ResourceManagerServices {
 
                @Override
@@ -520,7 +587,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                }
 
                @Override
-               public Executor getExecutor() {
+               public Executor getMainThreadExecutor() {
                        return ResourceManager.this.getMainThreadExecutor();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 87303a1..3c81227 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -22,11 +22,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.InstanceID;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 
 import java.util.UUID;
 
@@ -56,10 +61,12 @@ public interface ResourceManagerGateway extends RpcGateway {
        /**
         * Requests a slot from the resource manager.
         *
-        * @param slotRequest Slot request
-        * @return Future slot assignment
+        * @param jobMasterLeaderID leader id of the JobMaster
+        * @param resourceManagerLeaderID leader if of the ResourceMaster
+        * @param slotRequest The slot to request
+        * @return The confirmation that the slot gets allocated
         */
-       Future<SlotRequestReply> requestSlot(
+       Future<RMSlotRequestReply> requestSlot(
                UUID jobMasterLeaderID,
                UUID resourceManagerLeaderID,
                SlotRequest slotRequest,
@@ -71,6 +78,7 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
         * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
         * @param resourceID              The resource ID of the TaskExecutor 
that registers
+        * @param slotReport              The slot report containing free and 
allocated task slots
         * @param timeout                 The timeout for the response.
         *
         * @return The future to the response by the ResourceManager.
@@ -79,6 +87,23 @@ public interface ResourceManagerGateway extends RpcGateway {
                UUID resourceManagerLeaderId,
                String taskExecutorAddress,
                ResourceID resourceID,
+               SlotReport slotReport,
+               @RpcTimeout Time timeout);
+
+       /**
+        * Sent by the TaskExecutor to notify the ResourceManager that a slot 
has become available.
+        *
+        * @param resourceManagerLeaderId The ResourceManager leader id
+        * @param resourceID The ResourceID of the TaskExecutor
+        * @param instanceID The InstanceID of the TaskExecutor
+        * @param slotID The SlotID of the freed slot
+        * @return The confirmation by the ResourceManager
+        */
+       Future<SlotAvailableReply> notifySlotAvailable(
+               UUID resourceManagerLeaderId,
+               ResourceID resourceID,
+               InstanceID instanceID,
+               SlotID slotID,
                @RpcTimeout Time timeout);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index 30994dc..b997a3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -39,6 +39,6 @@ public interface ResourceManagerServices {
        /**
         * Gets the executor which executes in the main thread of the 
ResourceManager
         */
-       Executor getExecutor();
+       Executor getMainThreadExecutor();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
deleted file mode 100644
index f719dce..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-/**
- * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
- */
-public class SlotRequestRegistered extends SlotRequestReply {
-
-       private static final long serialVersionUID = 4760320859275256855L;
-
-       public SlotRequestRegistered(AllocationID allocationID) {
-               super(allocationID);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
deleted file mode 100644
index 282a7d5..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-/**
- * Rejection message by the ResourceManager for a SlotRequest from the 
JobManager
- */
-public class SlotRequestRejected extends SlotRequestReply {
-
-       private static final long serialVersionUID = 9049346740895325144L;
-
-       public SlotRequestRejected(AllocationID allocationID) {
-               super(allocationID);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
deleted file mode 100644
index 1b85d0c..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-import java.io.Serializable;
-
-/**
- * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
- */
-public abstract class SlotRequestReply implements Serializable {
-
-       private static final long serialVersionUID = 42;
-
-       private final AllocationID allocationID;
-
-       public SlotRequestReply(AllocationID allocationID) {
-               this.allocationID = allocationID;
-       }
-
-       public AllocationID getAllocationID() {
-               return allocationID;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index deca8d3..f9f55f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcService;
 
 /**
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
        public StandaloneResourceManager(RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               SlotManager slotManager) {
-               super(rpcService, highAvailabilityServices, slotManager);
+                       HighAvailabilityServices highAvailabilityServices,
+                       SlotManagerFactory slotManagerFactory) {
+               super(rpcService, highAvailabilityServices, slotManagerFactory);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
new file mode 100644
index 0000000..01bc532
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class RMSlotRequestRegistered extends RMSlotRequestReply {
+
+       private static final long serialVersionUID = 4760320859275256855L;
+
+       public RMSlotRequestRegistered(AllocationID allocationID) {
+               super(allocationID);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
new file mode 100644
index 0000000..649d61c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Rejection message by the ResourceManager for a SlotRequest from the 
JobManager
+ */
+public class RMSlotRequestRejected extends RMSlotRequestReply {
+
+       private static final long serialVersionUID = 9049346740895325144L;
+
+       public RMSlotRequestRejected(AllocationID allocationID) {
+               super(allocationID);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
new file mode 100644
index 0000000..66e1911
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public abstract class RMSlotRequestReply implements Serializable {
+
+       private static final long serialVersionUID = 42;
+
+       private final AllocationID allocationID;
+
+       public RMSlotRequestReply(AllocationID allocationID) {
+               this.allocationID = allocationID;
+       }
+
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
new file mode 100644
index 0000000..f2e0105
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Sent by the ResourceManager to the TaskExecutor confirm receipt of
+ * {@code 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway.notifySlotAvailable}.
+ */
+public class SlotAvailableReply implements Serializable {
+
+       private final UUID resourceManagerLeaderID;
+
+       private final SlotID slotID;
+
+       public SlotAvailableReply(UUID resourceManagerLeaderID, SlotID slotID) {
+               this.resourceManagerLeaderID = resourceManagerLeaderID;
+               this.slotID = slotID;
+       }
+
+       public UUID getResourceManagerLeaderID() {
+               return resourceManagerLeaderID;
+       }
+
+       public SlotID getSlotID() {
+               return slotID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
new file mode 100644
index 0000000..c0f0f49
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+/**
+ * Acknowledgment by the TaskExecutor for a SlotRequest from the 
ResourceManager
+ */
+public class TMSlotRequestRegistered extends TMSlotRequestReply {
+
+       private static final long serialVersionUID = 4760320859275256855L;
+
+       public TMSlotRequestRegistered(InstanceID instanceID, ResourceID 
resourceID, AllocationID allocationID) {
+               super(instanceID, resourceID, allocationID);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
new file mode 100644
index 0000000..9b10a35
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+/**
+ * Rejection by the TaskExecutor for a SlotRequest from the ResourceManager
+ */
+public class TMSlotRequestRejected extends TMSlotRequestReply {
+
+       private static final long serialVersionUID = 9049346740895325144L;
+
+       public TMSlotRequestRejected(InstanceID instanceID, ResourceID 
resourceID, AllocationID allocationID) {
+               super(instanceID, resourceID, allocationID);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
new file mode 100644
index 0000000..b23b6e9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the TaskExecutor for a SlotRequest from the 
ResourceManager
+ */
+public abstract class TMSlotRequestReply implements Serializable {
+
+       private static final long serialVersionUID = 42;
+
+       private final InstanceID instanceID;
+
+       private final ResourceID resourceID;
+
+       private final AllocationID allocationID;
+
+       protected TMSlotRequestReply(InstanceID instanceID, ResourceID 
resourceID, AllocationID allocationID) {
+               this.instanceID = instanceID;
+               this.resourceID = resourceID;
+               this.allocationID = allocationID;
+       }
+
+       public InstanceID getInstanceID() {
+               return instanceID;
+       }
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+       public AllocationID getAllocationID() {
+               return allocationID;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
index 6b21f5c..bfa9c00 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
@@ -24,20 +24,20 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import java.io.Serializable;
 
 /**
- * This class is responsible for group the TaskExecutorGateway and the 
InstanceID of a registered task executor.
+ * This class is responsible for grouping the TaskExecutorGateway and the 
InstanceID
+ * of a registered task executor.
  */
 public class TaskExecutorRegistration implements Serializable {
 
        private static final long serialVersionUID = -2062957799469434614L;
 
-       private TaskExecutorGateway taskExecutorGateway;
+       private final InstanceID instanceID;
 
-       private InstanceID instanceID;
+       private TaskExecutorGateway taskExecutorGateway;
 
-       public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-                                                                       
InstanceID instanceID) {
+       public TaskExecutorRegistration(TaskExecutorGateway 
taskExecutorGateway) {
+               this.instanceID = new InstanceID();
                this.taskExecutorGateway = taskExecutorGateway;
-               this.instanceID = instanceID;
        }
 
        public InstanceID getInstanceID() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
new file mode 100644
index 0000000..ff28f94
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class extends the {@link TaskExecutorRegistration}, adding the worker 
information.
+ */
+public class WorkerRegistration<WorkerType extends Serializable> extends 
TaskExecutorRegistration {
+
+       private static final long serialVersionUID = -2062957799469434614L;
+
+       private WorkerType worker;
+
+       public WorkerRegistration(TaskExecutorGateway taskExecutorGateway, 
WorkerType worker) {
+               super(taskExecutorGateway);
+               this.worker = worker;
+       }
+
+       public WorkerType getWorker() {
+               return worker;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
deleted file mode 100644
index ae1de5a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * A simple SlotManager which ignores resource profiles.
- */
-public class SimpleSlotManager extends SlotManager {
-
-       @Override
-       protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, 
ResourceSlot> freeSlots) {
-               final Iterator<ResourceSlot> slotIterator = 
freeSlots.values().iterator();
-               if (slotIterator.hasNext()) {
-                       return slotIterator.next();
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, 
Map<AllocationID, SlotRequest> pendingRequests) {
-               final Iterator<SlotRequest> requestIterator = 
pendingRequests.values().iterator();
-               if (requestIterator.hasNext()) {
-                       return requestIterator.next();
-               } else {
-                       return null;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index a56b2f6..7eb2d78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -28,11 +28,12 @@ import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,8 +65,11 @@ public abstract class SlotManager {
 
        protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
+       /** The Resource allocation provider */
+       protected final ResourceManagerServices rmServices;
+
        /** All registered task managers with ResourceID and gateway. */
-       private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways;
+       private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
 
        /** All registered slots, including free and allocated slots */
        private final Map<ResourceID, Map<SlotID, ResourceSlot>> 
registeredSlots;
@@ -84,29 +88,17 @@ public abstract class SlotManager {
        /** The current leader id set by the ResourceManager */
        private UUID leaderID;
 
-       /** The Resource allocation provider */
-       private ResourceManagerServices resourceManagerServices;
-
-       public SlotManager() {
+       public SlotManager(ResourceManagerServices rmServices) {
+               this.rmServices = checkNotNull(rmServices);
                this.registeredSlots = new HashMap<>(16);
                this.pendingSlotRequests = new LinkedHashMap<>(16);
                this.freeSlots = new HashMap<>(16);
                this.allocationMap = new AllocationMap();
-               this.taskManagerGateways = new HashMap<>();
+               this.taskManagers = new HashMap<>();
                this.timeout = Time.seconds(10);
+               this.leaderID = new UUID(0, 0);
        }
 
-       /**
-        * Initializes the resource supplier which is needed to request new 
resources.
-        */
-       public void setupResourceManagerServices(ResourceManagerServices 
resourceManagerServices) {
-               if (this.resourceManagerServices != null) {
-                       throw new 
IllegalStateException("ResourceManagerServices may only be set once.");
-               }
-               this.resourceManagerServices = resourceManagerServices;
-       }
-
-
        // 
------------------------------------------------------------------------
        //  slot managements
        // 
------------------------------------------------------------------------
@@ -118,13 +110,13 @@ public abstract class SlotManager {
         * RPC's main thread to avoid race condition).
         *
         * @param request The detailed request of the slot
-        * @return SlotRequestRegistered The confirmation message to be send to 
the caller
+        * @return RMSlotRequestRegistered The confirmation message to be send 
to the caller
         */
-       public SlotRequestRegistered requestSlot(final SlotRequest request) {
+       public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
                final AllocationID allocationId = request.getAllocationId();
                if (isRequestDuplicated(request)) {
                        LOG.warn("Duplicated slot request, AllocationID:{}", 
allocationId);
-                       return null;
+                       return new RMSlotRequestRegistered(allocationId);
                }
 
                // try to fulfil the request with current free slots
@@ -136,53 +128,38 @@ public abstract class SlotManager {
                        // record this allocation in bookkeeping
                        allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
                        // remove selected slot from free pool
-                       final ResourceSlot removedSlot = 
freeSlots.remove(slot.getSlotId());
-
-                       final Future<SlotRequestReply> slotRequestReplyFuture =
-                               
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
-                       slotRequestReplyFuture.handleAsync(new 
BiFunction<SlotRequestReply, Throwable, Object>() {
-                               @Override
-                               public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
-                                       if (throwable != null) {
-                                               // we failed, put the slot and 
the request back again
-                                               if 
(allocationMap.isAllocated(slot.getSlotId())) {
-                                                       // only re-add if the 
slot hasn't been removed in the meantime
-                                                       
freeSlots.put(slot.getSlotId(), removedSlot);
-                                               }
-                                               
pendingSlotRequests.put(allocationId, request);
-                                       }
-                                       return null;
-                               }
-                       }, resourceManagerServices.getExecutor());
+                       freeSlots.remove(slot.getSlotId());
+
+                       sendSlotRequest(slot, request);
                } else {
                        LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
                                "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
-                       Preconditions.checkState(resourceManagerServices != 
null,
+                       Preconditions.checkState(rmServices != null,
                                "Attempted to allocate resources but no 
ResourceManagerServices set.");
-                       
resourceManagerServices.allocateResource(request.getResourceProfile());
+                       
rmServices.allocateResource(request.getResourceProfile());
                        pendingSlotRequests.put(allocationId, request);
                }
 
-               return new SlotRequestRegistered(allocationId);
+               return new RMSlotRequestRegistered(allocationId);
        }
 
        /**
-        * Sync slot status with TaskManager's SlotReport.
+        * Notifies the SlotManager that a slot is available again after being 
allocated.
+        * @param slotID slot id of available slot
         */
-       public void updateSlotStatus(final SlotReport slotReport) {
-               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-                       updateSlotStatus(slotStatus);
+       public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
+               if (!allocationMap.isAllocated(slotID)) {
+                       throw new IllegalStateException("Slot was not 
previously allocated but " +
+                               "TaskManager reports it as available again");
                }
-       }
-
-       /**
-        * Registers a TaskExecutor
-        * @param resourceID TaskExecutor's ResourceID
-        * @param gateway TaskExcutor's gateway
-        */
-       public void registerTaskExecutor(ResourceID resourceID, 
TaskExecutorGateway gateway) {
-               this.taskManagerGateways.put(resourceID, gateway);
+               allocationMap.removeAllocation(slotID);
+               final Map<SlotID, ResourceSlot> slots = 
registeredSlots.get(resourceID);
+               ResourceSlot freeSlot = slots.get(slotID);
+               if (freeSlot == null) {
+                       throw new IllegalStateException("Slot was not 
registered with SlotManager but " +
+                               "TaskManager reported it to be available.");
+               }
+               handleFreeSlot(freeSlot);
        }
 
        /**
@@ -200,51 +177,72 @@ public abstract class SlotManager {
         * @param originalRequest The original slot request
         * @param slotId          The target SlotID
         */
-       public void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
+       void handleSlotRequestFailedAtTaskManager(final SlotRequest 
originalRequest, final SlotID slotId) {
                final AllocationID originalAllocationId = 
originalRequest.getAllocationId();
                LOG.info("Slot request failed at TaskManager, SlotID:{}, 
AllocationID:{}, JobID:{}",
                        slotId, originalAllocationId, 
originalRequest.getJobId());
 
-               // verify the allocation info before we do anything
-               if (freeSlots.containsKey(slotId)) {
-                       // this slot is currently empty, no need to de-allocate 
it from our allocations
-                       LOG.info("Original slot is somehow empty, retrying this 
request");
+               if (allocationMap.isAllocated(slotId)) {
+                       final AllocationID expectedAllocationId = 
allocationMap.getAllocationID(slotId);
 
-                       // before retry, we should double check whether this 
request was allocated by some other ways
-                       if (!allocationMap.isAllocated(originalAllocationId)) {
-                               requestSlot(originalRequest);
+                       // check whether we have an agreement on whom this slot 
belongs to
+                       if (originalAllocationId.equals(expectedAllocationId)) {
+                               LOG.info("De-allocate this request and retry");
+                               
allocationMap.removeAllocation(expectedAllocationId);
+                               
pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
+                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
+                               // treat this slot as empty and retry with a 
different request
+                               handleFreeSlot(slot);
                        } else {
-                               LOG.info("The failed request has somehow been 
allocated, SlotID:{}",
-                                       
allocationMap.getSlotID(originalAllocationId));
+                               LOG.error("Slot request failed for slot {} with 
allocation id {}:" +
+                                               " Allocation id did not match 
the expected allocation id {}.",
+                                       slotId, originalAllocationId, 
expectedAllocationId);
                        }
-               } else if (allocationMap.isAllocated(slotId)) {
-                       final AllocationID currentAllocationId = 
allocationMap.getAllocationID(slotId);
+               } else {
+                       LOG.error("Slot request failed for slot {} with 
allocation id {}: " +
+                                       "Slot was not previously registered.",
+                               slotId, originalAllocationId);
+               }
+       }
 
-                       // check whether we have an agreement on whom this slot 
belongs to
-                       if (originalAllocationId.equals(currentAllocationId)) {
-                               LOG.info("De-allocate this request and retry");
-                               
allocationMap.removeAllocation(currentAllocationId);
+       /**
+        * Registers a TaskExecutor
+        * @param resourceID TaskExecutor's ResourceID
+        * @param registration TaskExecutor's registration
+        * @param slotReport TaskExecutor's free and allocated slots
+        */
+       public void registerTaskExecutor(
+                       ResourceID resourceID,
+                       TaskExecutorRegistration registration,
+                       SlotReport slotReport) {
 
-                               // put this slot back to free pool
-                               ResourceSlot slot = 
checkNotNull(getRegisteredSlot(slotId));
-                               freeSlots.put(slotId, slot);
+               if (taskManagers.get(resourceID) != null) {
+                       notifyTaskManagerFailure(resourceID);
+               }
 
-                               // retry the request
-                               requestSlot(originalRequest);
-                       } else {
-                               // the slot is taken by someone else, no need 
to de-allocate it from our allocations
-                               LOG.info("Original slot is taken by someone 
else, current AllocationID:{}", currentAllocationId);
+               this.taskManagers.put(resourceID, registration);
 
-                               // before retry, we should double check whether 
this request was allocated by some other ways
-                               if 
(!allocationMap.isAllocated(originalAllocationId)) {
-                                       requestSlot(originalRequest);
-                               } else {
-                                       LOG.info("The failed request is somehow 
been allocated, SlotID:{}",
-                                               
allocationMap.getSlotID(originalAllocationId));
-                               }
+               for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+                       final SlotID slotId = slotStatus.getSlotID();
+
+                       final TaskExecutorRegistration taskExecutorRegistration 
= taskManagers.get(slotId.getResourceID());
+                       if (taskExecutorRegistration == null) {
+                               LOG.info("Received SlotStatus but ResourceID {} 
is unknown to the SlotManager",
+                                       slotId.getResourceID());
+                               return;
+                       }
+
+                       final ResourceSlot slot = new ResourceSlot(slotId, 
slotStatus.getProfiler(), taskExecutorRegistration);
+
+                       registerNewSlot(slot);
+                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, slotStatus.getAllocationID());
+
+                       if (slotStatus.getAllocationID() != null) {
+                               // slot in use, record this in bookkeeping
+                               allocationMap.addAllocation(slotId, 
slotStatus.getAllocationID());
+                       } else {
+                               handleFreeSlot(slot);
                        }
-               } else {
-                       LOG.error("BUG! {} is neither in free pool nor in 
allocated pool", slotId);
                }
        }
 
@@ -255,7 +253,7 @@ public abstract class SlotManager {
         */
        public void notifyTaskManagerFailure(final ResourceID resourceId) {
                LOG.info("Resource:{} been notified failure", resourceId);
-               taskManagerGateways.remove(resourceId);
+               taskManagers.remove(resourceId);
                final Map<SlotID, ResourceSlot> slotIdsToRemove = 
registeredSlots.remove(resourceId);
                if (slotIdsToRemove != null) {
                        for (SlotID slotId : slotIdsToRemove.keySet()) {
@@ -276,92 +274,6 @@ public abstract class SlotManager {
        // 
------------------------------------------------------------------------
 
        /**
-        * Update slot status based on TaskManager's report. There are mainly 
two situations when we receive the report:
-        * <ul>
-        * <li>1. The slot is newly registered.</li>
-        * <li>2. The slot has registered, it contains its current status.</li>
-        * </ul>
-        * <p>
-        * Regarding 1: It's fairly simple, we just record this slot's status, 
and trigger schedule if slot is empty.
-        * <p>
-        * Regarding 2: It will cause some weird situation since we may have 
some time-gap on how the slot's status really
-        * is. We may have some updates on the slot's allocation, but it 
doesn't reflected by TaskManager's heartbeat yet,
-        * and we may make some wrong decision if we cannot guarantee we have 
the exact status about all the slots. So
-        * the principle here is: We always trust TaskManager's heartbeat, we 
will correct our information based on that
-        * and take next action based on the diff between our information and 
heartbeat status.
-        *
-        * @param reportedStatus Reported slot status
-        */
-       void updateSlotStatus(final SlotStatus reportedStatus) {
-               final SlotID slotId = reportedStatus.getSlotID();
-
-               final TaskExecutorGateway taskExecutorGateway = 
taskManagerGateways.get(slotId.getResourceID());
-               if (taskExecutorGateway == null) {
-                       LOG.info("Received SlotStatus but ResourceID {} is 
unknown to the SlotManager",
-                               slotId.getResourceID());
-                       return;
-               }
-
-               final ResourceSlot slot = new ResourceSlot(slotId, 
reportedStatus.getProfiler(), taskExecutorGateway);
-
-               if (registerNewSlot(slot)) {
-                       // we have a newly registered slot
-                       LOG.info("New slot appeared, SlotID:{}, 
AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
-                       if (reportedStatus.getAllocationID() != null) {
-                               // slot in use, record this in bookkeeping
-                               allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
-                       } else {
-                               handleFreeSlot(slot);
-                       }
-               } else {
-                       // slot exists, update current information
-                       if (reportedStatus.getAllocationID() != null) {
-                               // slot is reported in use
-                               final AllocationID reportedAllocationId = 
reportedStatus.getAllocationID();
-
-                               // check whether we also thought this slot is 
in use
-                               if (allocationMap.isAllocated(slotId)) {
-                                       // we also think that slot is in use, 
check whether the AllocationID matches
-                                       final AllocationID currentAllocationId 
= allocationMap.getAllocationID(slotId);
-
-                                       if 
(!reportedAllocationId.equals(currentAllocationId)) {
-                                               LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:{}",
-                                                       slotId, 
currentAllocationId, reportedAllocationId);
-
-                                               // seems we have a disagreement 
about the slot assignments, need to correct it
-                                               
allocationMap.removeAllocation(slotId);
-                                               
allocationMap.addAllocation(slotId, reportedAllocationId);
-                                       }
-                               } else {
-                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:null, reported:{}",
-                                               slotId, reportedAllocationId);
-
-                                       // we thought the slot is free, should 
correct this information
-                                       allocationMap.addAllocation(slotId, 
reportedStatus.getAllocationID());
-
-                                       // remove this slot from free slots pool
-                                       freeSlots.remove(slotId);
-                               }
-                       } else {
-                               // slot is reported empty
-
-                               // check whether we also thought this slot is 
empty
-                               if (allocationMap.isAllocated(slotId)) {
-                                       LOG.info("Slot allocation info 
mismatch! SlotID:{}, current:{}, reported:null",
-                                               slotId, 
allocationMap.getAllocationID(slotId));
-
-                                       // we thought the slot is in use, 
correct it
-                                       allocationMap.removeAllocation(slotId);
-
-                                       // we have a free slot!
-                                       handleFreeSlot(slot);
-                               }
-                       }
-               }
-       }
-
-       /**
         * When we have a free slot, try to fulfill the pending request first. 
If any request can be fulfilled,
         * record this allocation in bookkeeping and send slot request to 
TaskManager, else we just add this slot
         * to the free pool.
@@ -373,32 +285,45 @@ public abstract class SlotManager {
 
                if (chosenRequest != null) {
                        final AllocationID allocationId = 
chosenRequest.getAllocationId();
-                       final SlotRequest removedSlotRequest = 
pendingSlotRequests.remove(allocationId);
+                       final SlotRequest slotRequest = 
pendingSlotRequests.remove(allocationId);
 
                        LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
                                allocationId, chosenRequest.getJobId());
                        allocationMap.addAllocation(freeSlot.getSlotId(), 
allocationId);
 
-                       final Future<SlotRequestReply> slotRequestReplyFuture =
-                               
freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
-                       slotRequestReplyFuture.handleAsync(new 
BiFunction<SlotRequestReply, Throwable, Object>() {
-                               @Override
-                               public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
-                                       if (throwable != null) {
-                                               // we failed, add the request 
back again
-                                               if 
(allocationMap.isAllocated(freeSlot.getSlotId())) {
-                                                       
pendingSlotRequests.put(allocationId, removedSlotRequest);
-                                               }
-                                       }
-                                       return null;
-                               }
-                       }, resourceManagerServices.getExecutor());
+                       sendSlotRequest(freeSlot, slotRequest);
                } else {
                        freeSlots.put(freeSlot.getSlotId(), freeSlot);
                }
        }
 
+       private void sendSlotRequest(final ResourceSlot freeSlot, final 
SlotRequest slotRequest) {
+
+               final AllocationID allocationID = slotRequest.getAllocationId();
+               final TaskExecutorRegistration registration = 
freeSlot.getTaskExecutorRegistration();
+               final Future<TMSlotRequestReply> slotRequestReplyFuture =
+                       registration.getTaskExecutorGateway()
+                               .requestSlot(freeSlot.getSlotId(), 
allocationID, leaderID, timeout);
+
+               slotRequestReplyFuture.handleAsync(new 
BiFunction<TMSlotRequestReply, Throwable, Void>() {
+                       @Override
+                       public Void apply(TMSlotRequestReply slotRequestReply, 
Throwable throwable) {
+                               TaskExecutorRegistration current = 
taskManagers.get(slotRequestReply.getResourceID());
+                               if (current != null && 
current.getInstanceID().equals(slotRequestReply.getInstanceID())) {
+                                       if (throwable != null || 
slotRequestReply instanceof TMSlotRequestRejected) {
+                                               
handleSlotRequestFailedAtTaskManager(slotRequest, freeSlot.getSlotId());
+                                       } else {
+                                               LOG.debug("Successfully 
registered slot {} ", freeSlot.getSlotId());
+                                       }
+                               } else {
+                                       LOG.debug("Discarding message from 
obsolete TaskExecutor with InstanceID {}",
+                                               
slotRequestReply.getInstanceID());
+                               }
+                               return null;
+                       }
+               }, rmServices.getMainThreadExecutor());
+       }
+
        /**
         * Check whether the request is duplicated. We use AllocationID to 
identify slot request, for each
         * formerly received slot request, it is either in pending list or 
already been allocated.
@@ -413,18 +338,17 @@ public abstract class SlotManager {
        }
 
        /**
-        * Try to register slot, and tell if this slot is newly registered.
+        * Registers a new slot with the SlotManager.
         *
-        * @param slot The ResourceSlot which will be checked and registered
-        * @return <tt>true</tt> if we meet a new slot
+        * @param slot The ResourceSlot which will be registered
         */
-       private boolean registerNewSlot(final ResourceSlot slot) {
+       private void registerNewSlot(final ResourceSlot slot) {
                final SlotID slotId = slot.getSlotId();
                final ResourceID resourceId = slotId.getResourceID();
                if (!registeredSlots.containsKey(resourceId)) {
                        registeredSlots.put(resourceId, new HashMap<SlotID, 
ResourceSlot>());
                }
-               return registeredSlots.get(resourceId).put(slotId, slot) == 
null;
+               registeredSlots.get(resourceId).put(slotId, slot);
        }
 
        private ResourceSlot getRegisteredSlot(final SlotID slotId) {
@@ -559,12 +483,12 @@ public abstract class SlotManager {
         * Clears the state of the SlotManager after leadership revokal
         */
        public void clearState() {
-               taskManagerGateways.clear();
+               taskManagers.clear();
                registeredSlots.clear();
                pendingSlotRequests.clear();
                freeSlots.clear();
                allocationMap.clear();
-               leaderID = null;
+               leaderID = new UUID(0, 0);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
new file mode 100644
index 0000000..b4e9c99
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+
+/**
+ * Factory to create a SlotManager and provide it with dependencies.
+ */
+public interface SlotManagerFactory {
+
+       /**
+        * Creates a SlotManager and provides it with ResourceManager services.
+        */
+       SlotManager create(ResourceManagerServices rmServices);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index a5de2d5..54adce6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -37,20 +36,20 @@ public class SlotReport implements Serializable {
        /** The slots status of the TaskManager */
        private final List<SlotStatus> slotsStatus;
 
-       /** The resource id which identifies the TaskManager */
-       private final ResourceID resourceID;
+       public SlotReport() {
+               this(Collections.<SlotStatus>emptyList());
+       }
+
+       public SlotReport(SlotStatus slotStatus) {
+               this(Collections.singletonList(slotStatus));
+       }
 
-       public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID 
resourceID) {
+       public SlotReport(final List<SlotStatus> slotsStatus) {
                this.slotsStatus = checkNotNull(slotsStatus);
-               this.resourceID = checkNotNull(resourceID);
        }
 
        public List<SlotStatus> getSlotsStatus() {
                return slotsStatus;
        }
 
-       public ResourceID getResourceID() {
-               return resourceID;
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7df0a91..c0041a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -38,6 +41,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -78,6 +83,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /** The fatal error handler to use in case of a fatal error */
        private final FatalErrorHandler fatalErrorHandler;
 
+       /** Slots which have become available but haven't been confirmed by the 
RM */
+       private final Set<SlotID> unconfirmedFreeSlots;
+
        // --------- resource manager --------
 
        private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
@@ -109,6 +117,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
                this.numberOfSlots =  taskManagerConfiguration.getNumberSlots();
+
+               this.unconfirmedFreeSlots = new HashSet<>();
        }
 
        // 
------------------------------------------------------------------------
@@ -152,6 +162,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        }
                }
 
+               unconfirmedFreeSlots.clear();
+
                // establish a connection to the new leader
                if (newLeaderAddress != null) {
                        log.info("Attempting to register at ResourceManager 
{}", newLeaderAddress);
@@ -169,13 +181,25 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /**
         * Requests a slot from the TaskManager
         *
+        * @param slotID Slot id for the request
         * @param allocationID id for the request
         * @param resourceManagerLeaderID current leader id of the 
ResourceManager
         * @return answer to the slot request
         */
        @RpcMethod
-       public SlotRequestReply requestSlot(AllocationID allocationID, UUID 
resourceManagerLeaderID) {
-               return new SlotRequestRegistered(allocationID);
+       public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID 
allocationID, UUID resourceManagerLeaderID) {
+               if 
(!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID))
 {
+                       return new TMSlotRequestRejected(
+                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
+               }
+               if (unconfirmedFreeSlots.contains(slotID)) {
+                       // check if request has not been blacklisted because 
the notification of a free slot
+                       // has not been confirmed by the ResourceManager
+                       return new TMSlotRequestRejected(
+                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
+               }
+               return new TMSlotRequestRegistered(new InstanceID(), 
ResourceID.generate(), allocationID);
+
        }
 
        // 
------------------------------------------------------------------------
@@ -227,6 +251,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                return resourceManagerConnection;
        }
 
+       @VisibleForTesting
+       public void addUnconfirmedFreeSlotNotification(SlotID slotID) {
+               unconfirmedFreeSlots.add(slotID);
+       }
+
        // 
------------------------------------------------------------------------
        //  Utility classes
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 0962802..2360b53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,8 +20,9 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
@@ -41,11 +42,13 @@ public interface TaskExecutorGateway extends RpcGateway {
        /**
         * Requests a slot from the TaskManager
         *
+        * @param slotID slot id for the request
         * @param allocationID id for the request
         * @param resourceManagerLeaderID current leader id of the 
ResourceManager
         * @return answer to the slot request
         */
-       Future<SlotRequestReply> requestSlot(
+       Future<TMSlotRequestReply> requestSlot(
+               SlotID slotID,
                AllocationID allocationID,
                UUID resourceManagerLeaderID,
                @RpcTimeout Time timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index b4b3bae..2dbd9eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -113,7 +113,7 @@ public class TaskExecutorToResourceManagerConnection
                                ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
 
                        Time timeout = Time.milliseconds(timeoutMillis);
-                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, timeout);
+                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, new SlotReport(), timeout);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index fdb83f5..ce1fdca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -43,7 +44,8 @@ public class ResourceManagerHATest {
 
        @Test
        public void testGrantAndRevokeLeadership() throws Exception {
-               // mock a RpcService which will return a special RpcGateway 
when call its startServer method, the returned RpcGateway directly execute 
runAsync call
+               // mock a RpcService which will return a special RpcGateway 
when call its startServer method,
+               // the returned RpcGateway directly executes runAsync calls
                TestingResourceManagerGatewayProxy gateway = 
mock(TestingResourceManagerGatewayProxy.class);
                doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
 
@@ -54,18 +56,18 @@ public class ResourceManagerHATest {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-               SlotManager slotManager = mock(SlotManager.class);
-               final ResourceManager resourceManager = new 
StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager);
+               final ResourceManager resourceManager =
+                       new TestingResourceManager(rpcService, 
highAvailabilityServices);
                resourceManager.start();
                // before grant leadership, resourceManager's leaderId is null
-               Assert.assertNull(resourceManager.getLeaderSessionID());
+               Assert.assertEquals(new UUID(0,0), 
resourceManager.getLeaderSessionID());
                final UUID leaderId = UUID.randomUUID();
                leaderElectionService.isLeader(leaderId);
                // after grant leadership, resourceManager's leaderId has value
                Assert.assertEquals(leaderId, 
resourceManager.getLeaderSessionID());
                // then revoke leadership, resourceManager's leaderId is null 
again
                leaderElectionService.notLeader();
-               Assert.assertNull(resourceManager.getLeaderSessionID());
+               Assert.assertEquals(new UUID(0,0), 
resourceManager.getLeaderSessionID());
        }
 
        private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutable, StartStoppable, RpcGateway {

Reply via email to