[FLINK-4490] [distributed coordination] (part 2) Make slots independent of 
'Instance'.

To allow for a future dynamic slot allocation and release model, the slots 
should not depend on 'Instance'.
In this change, the Slots hold most of the necessary information directly 
(location, gateway) and
the interact with the Instance only via a 'SlotOwner' interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaa474ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaa474ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaa474ad

Branch: refs/heads/master
Commit: aaa474ad8f1d638c3988697dd57446802142119b
Parents: 34cda87
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 30 20:34:20 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java       |  14 +-
 .../flink/runtime/executiongraph/Execution.java |  51 +++----
 .../runtime/executiongraph/ExecutionVertex.java |  17 ++-
 .../runtime/instance/HardwareDescription.java   |  28 ++--
 .../apache/flink/runtime/instance/Instance.java |  22 +--
 .../flink/runtime/instance/SharedSlot.java      |  45 ++++--
 .../flink/runtime/instance/SimpleSlot.java      |  35 +++--
 .../org/apache/flink/runtime/instance/Slot.java | 103 ++++++++++---
 .../instance/SlotSharingGroupAssignment.java    | 132 ++++++++---------
 .../scheduler/CoLocationConstraint.java         |  48 ++++---
 .../runtime/jobmanager/scheduler/Scheduler.java |  74 ++++++----
 .../scheduler/SlotAllocationFuture.java         | 116 ++++++++++-----
 .../runtime/jobmanager/slots/SlotOwner.java     |  29 ++++
 .../taskmanager/TaskManagerLocation.java        |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../testingUtils/TestingJobManagerLike.scala    |   2 +-
 .../ExecutionGraphMetricsTest.java              |   4 +-
 .../VertexLocationConstraintTest.java           |  52 +++----
 .../flink/runtime/instance/SharedSlotsTest.java |  67 ++++-----
 .../ScheduleWithCoLocationHintTest.java         | 144 ++++++++++---------
 .../scheduler/SchedulerIsolatedTasksTest.java   |  52 +++----
 .../scheduler/SchedulerSlotSharingTest.java     | 102 +++++++------
 .../scheduler/SchedulerTestUtils.java           |  29 +++-
 .../scheduler/SlotAllocationFutureTest.java     |  51 +++++--
 .../resourcemanager/ResourceManagerITCase.java  |  17 ++-
 25 files changed, 741 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index f31febb..0912055 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -18,16 +18,18 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +90,7 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
        public static InputChannelDeploymentDescriptor[] fromEdges(
                        ExecutionEdge[] edges, SimpleSlot consumerSlot) {
 
+               final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerID();
                final InputChannelDeploymentDescriptor[] icdd = new 
InputChannelDeploymentDescriptor[edges.length];
 
                // Each edge is connected to a different result partition
@@ -105,16 +108,17 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
                                        (producerState == ExecutionState.RUNNING
                                                        || producerState == 
ExecutionState.FINISHED)) {
 
-                               final Instance partitionInstance = 
producerSlot.getInstance();
+                               final TaskManagerLocation 
partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+                               final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
 
-                               if 
(partitionInstance.equals(consumerSlot.getInstance())) {
-                                       // Consuming task is deployed to the 
same instance as the partition => local
+                               if 
(partitionTaskManager.equals(consumerTaskManager)) {
+                                       // Consuming task is deployed to the 
same TaskManager as the partition => local
                                        partitionLocation = 
ResultPartitionLocation.createLocal();
                                }
                                else {
                                        // Different instances => remote
                                        final ConnectionID connectionId = new 
ConnectionID(
-                                                       
partitionInstance.getInstanceConnectionInfo(),
+                                                       
partitionTaskManagerLocation,
                                                        
consumedPartition.getIntermediateResult().getConnectionIndex());
 
                                        partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 197999c..846df49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -49,7 +49,6 @@ import 
org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -371,7 +370,7 @@ public class Execution {
                                throw new JobException("Could not assign the 
ExecutionVertex to the slot " + slot);
                        }
                        this.assignedResource = slot;
-                       this.assignedResourceLocation = 
slot.getInstance().getInstanceConnectionInfo();
+                       this.assignedResourceLocation = 
slot.getTaskManagerLocation();
 
                        // race double check, did we fail/cancel and do we need 
to release the slot?
                        if (this.state != DEPLOYING) {
@@ -381,7 +380,7 @@ public class Execution {
 
                        if (LOG.isInfoEnabled()) {
                                LOG.info(String.format("Deploying %s (attempt 
#%d) to %s", vertex.getSimpleName(),
-                                               attemptNumber, 
slot.getInstance().getInstanceConnectionInfo().getHostname()));
+                                               attemptNumber, 
assignedResourceLocation.getHostname()));
                        }
 
                        final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
@@ -393,9 +392,8 @@ public class Execution {
 
                        // register this execution at the execution graph, to 
receive call backs
                        vertex.getExecutionGraph().registerExecution(this);
-
-                       final Instance instance = slot.getInstance();
-                       final ActorGateway gateway = instance.getActorGateway();
+                       
+                       final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
                        final Future<Object> deployAction = gateway.ask(new 
SubmitTask(deployment), timeout);
 
@@ -408,7 +406,7 @@ public class Execution {
                                                        String taskname = 
deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
 
                                                        markFailed(new 
Exception(
-                                                                       "Cannot 
deploy task " + taskname + " - TaskManager (" + instance
+                                                                       "Cannot 
deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
                                                                        + ") 
not responding after a timeout of " + timeout, failure));
                                                }
                                                else {
@@ -437,7 +435,7 @@ public class Execution {
                final SimpleSlot slot = this.assignedResource;
 
                if (slot != null) {
-                       final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+                       final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
                        Future<Object> stopResult = gateway.retry(
                                new StopTask(attemptId),
@@ -590,24 +588,25 @@ public class Execution {
                                                continue;
                                        }
 
-                                       final Instance consumerInstance = 
consumerSlot.getInstance();
-
-                                       final ResultPartitionID partitionId = 
new ResultPartitionID(
-                                                       
partition.getPartitionId(), attemptId);
+                                       final TaskManagerLocation 
partitionTaskManagerLocation = partition.getProducer()
+                                                       
.getCurrentAssignedResource().getTaskManagerLocation();
+                                       final ResourceID partitionTaskManager = 
partitionTaskManagerLocation.getResourceID();
+                                       
+                                       final ResourceID consumerTaskManager = 
consumerSlot.getTaskManagerID();
 
-                                       final Instance partitionInstance = 
partition.getProducer()
-                                                       
.getCurrentAssignedResource().getInstance();
+                                       final ResultPartitionID partitionId = 
new ResultPartitionID(partition.getPartitionId(), attemptId);
+                                       
 
                                        final ResultPartitionLocation 
partitionLocation;
 
-                                       if 
(consumerInstance.equals(partitionInstance)) {
+                                       if 
(consumerTaskManager.equals(partitionTaskManager)) {
                                                // Consuming task is deployed 
to the same instance as the partition => local
                                                partitionLocation = 
ResultPartitionLocation.createLocal();
                                        }
                                        else {
                                                // Different instances => remote
                                                final ConnectionID connectionId 
= new ConnectionID(
-                                                               
partitionInstance.getInstanceConnectionInfo(),
+                                                               
partitionTaskManagerLocation,
                                                                
partition.getIntermediateResult().getConnectionIndex());
 
                                                partitionLocation = 
ResultPartitionLocation.createRemote(connectionId);
@@ -916,7 +915,7 @@ public class Execution {
 
                if (slot != null) {
 
-                       final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+                       final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
                        Future<Object> cancelResult = gateway.retry(
                                new CancelTask(attemptId),
@@ -946,14 +945,10 @@ public class Execution {
                final SimpleSlot slot = this.assignedResource;
 
                if (slot != null) {
-                       final Instance instance = slot.getInstance();
+                       final ActorGateway gateway = 
slot.getTaskManagerActorGateway();
 
-                       if (instance.isAlive()) {
-                               final ActorGateway gateway = 
instance.getActorGateway();
-
-                               // TODO For some tests this could be a problem 
when querying too early if all resources were released
-                               gateway.tell(new 
FailIntermediateResultPartitions(attemptId));
-                       }
+                       // TODO For some tests this could be a problem when 
querying too early if all resources were released
+                       gateway.tell(new 
FailIntermediateResultPartitions(attemptId));
                }
        }
 
@@ -968,15 +963,15 @@ public class Execution {
                        final UpdatePartitionInfo updatePartitionInfo) {
 
                if (consumerSlot != null) {
-                       final Instance instance = consumerSlot.getInstance();
-                       final ActorGateway gateway = instance.getActorGateway();
+                       final ActorGateway gateway = 
consumerSlot.getTaskManagerActorGateway();
+                       final TaskManagerLocation taskManagerLocation = 
consumerSlot.getTaskManagerLocation();
 
                        Future<Object> futureUpdate = 
gateway.ask(updatePartitionInfo, timeout);
 
                        futureUpdate.onFailure(new OnFailure() {
                                @Override
                                public void onFailure(Throwable failure) throws 
Throwable {
-                                       fail(new IllegalStateException("Update 
task on instance " + instance +
+                                       fail(new IllegalStateException("Update 
task on TaskManager " + taskManagerLocation +
                                                        " failed due to:", 
failure));
                                }
                        }, executionContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e5a115a..f02647e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -98,7 +97,7 @@ public class ExecutionVertex {
 
        private volatile Execution currentExecution;    // this field must 
never be null
 
-       private volatile List<Instance> locationConstraintInstances;
+       private volatile List<TaskManagerLocation> locationConstraintInstances;
 
        private volatile boolean scheduleLocalOnly;
 
@@ -352,7 +351,7 @@ public class ExecutionVertex {
                }
        }
 
-       public void setLocationConstraintHosts(List<Instance> instances) {
+       public void setLocationConstraintHosts(List<TaskManagerLocation> 
instances) {
                this.locationConstraintInstances = instances;
        }
 
@@ -376,9 +375,9 @@ public class ExecutionVertex {
         *
         * @return The preferred locations for this vertex execution, or null, 
if there is no preference.
         */
-       public Iterable<Instance> getPreferredLocations() {
+       public Iterable<TaskManagerLocation> getPreferredLocations() {
                // if we have hard location constraints, use those
-               List<Instance> constraintInstances = 
this.locationConstraintInstances;
+               List<TaskManagerLocation> constraintInstances = 
this.locationConstraintInstances;
                if (constraintInstances != null && 
!constraintInstances.isEmpty()) {
                        return constraintInstances;
                }
@@ -388,8 +387,8 @@ public class ExecutionVertex {
                        return Collections.emptySet();
                }
                else {
-                       Set<Instance> locations = new HashSet<Instance>();
-                       Set<Instance> inputLocations = new HashSet<Instance>();
+                       Set<TaskManagerLocation> locations = new HashSet<>();
+                       Set<TaskManagerLocation> inputLocations = new 
HashSet<>();
 
                        // go over all inputs
                        for (int i = 0; i < inputEdges.length; i++) {
@@ -402,7 +401,7 @@ public class ExecutionVertex {
                                                SimpleSlot sourceSlot = 
sources[k].getSource().getProducer().getCurrentAssignedResource();
                                                if (sourceSlot != null) {
                                                        // add input location
-                                                       
inputLocations.add(sourceSlot.getInstance());
+                                                       
inputLocations.add(sourceSlot.getTaskManagerLocation());
                                                        // inputs which have 
too many distinct sources are not considered
                                                        if 
(inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
                                                                
inputLocations.clear();
@@ -495,7 +494,7 @@ public class ExecutionVertex {
 
                        // send only if we actually have a target
                        if (slot != null) {
-                               ActorGateway gateway = 
slot.getInstance().getActorGateway();
+                               ActorGateway gateway = 
slot.getTaskManagerActorGateway();
                                if (gateway != null) {
                                        if (sender == null) {
                                                gateway.tell(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index bfcc1e5..9c1c5b7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -30,22 +30,16 @@ public final class HardwareDescription implements 
Serializable {
        private static final long serialVersionUID = 3380016608300325361L;
 
        /** The number of CPU cores available to the JVM on the compute node. */
-       private int numberOfCPUCores;
+       private final int numberOfCPUCores;
 
        /** The size of physical memory in bytes available on the compute node. 
*/
-       private long sizeOfPhysicalMemory;
+       private final long sizeOfPhysicalMemory;
 
        /** The size of the JVM heap memory */
-       private long sizeOfJvmHeap;
-       
-       /** The size of the memory managed by the system for caching, hashing, 
sorting, ... */
-       private long sizeOfManagedMemory;
+       private final long sizeOfJvmHeap;
 
-       
-       /**
-        * Public default constructor used for serialization process.
-        */
-       public HardwareDescription() {}
+       /** The size of the memory managed by the system for caching, hashing, 
sorting, ... */
+       private final long sizeOfManagedMemory;
 
        /**
         * Constructs a new hardware description object.
@@ -88,7 +82,7 @@ public final class HardwareDescription implements 
Serializable {
        public long getSizeOfJvmHeap() {
                return this.sizeOfJvmHeap;
        }
-       
+
        /**
         * Returns the size of the memory managed by the system for caching, 
hashing, sorting, ...
         * 
@@ -97,26 +91,26 @@ public final class HardwareDescription implements 
Serializable {
        public long getSizeOfManagedMemory() {
                return this.sizeOfManagedMemory;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Utils
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
        public String toString() {
                return String.format("cores=%d, physMem=%d, heap=%d, 
managed=%d", 
                                numberOfCPUCores, sizeOfPhysicalMemory, 
sizeOfJvmHeap, sizeOfManagedMemory);
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
        // Factory
        // 
--------------------------------------------------------------------------------------------
-       
+
        public static HardwareDescription extractFromSystem(long managedMemory) 
{
                final int numberOfCPUCores = Hardware.getNumberCPUCores();
                final long sizeOfJvmHeap = Runtime.getRuntime().maxMemory();
                final long sizeOfPhysicalMemory = 
Hardware.getSizeOfPhysicalMemory();
-               
+
                return new HardwareDescription(numberOfCPUCores, 
sizeOfPhysicalMemory, sizeOfJvmHeap, managedMemory);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 598b32b..fe46895 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -28,15 +28,20 @@ import java.util.Set;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * An instance represents a {@link 
org.apache.flink.runtime.taskmanager.TaskManager}
  * registered at a JobManager and ready to receive work.
  */
-public class Instance {
+public class Instance implements SlotOwner {
 
        private final static Logger LOG = 
LoggerFactory.getLogger(Instance.class);
 
@@ -241,7 +246,7 @@ public class Instance {
                                return null;
                        }
                        else {
-                               SimpleSlot slot = new SimpleSlot(jobID, this, 
nextSlot);
+                               SimpleSlot slot = new SimpleSlot(jobID, this, 
connectionInfo, nextSlot, actorGateway);
                                allocatedSlots.add(slot);
                                return slot;
                        }
@@ -278,7 +283,8 @@ public class Instance {
                                return null;
                        }
                        else {
-                               SharedSlot slot = new SharedSlot(jobID, this, 
nextSlot, sharingGroupAssignment);
+                               SharedSlot slot = new SharedSlot(
+                                               jobID, this, connectionInfo, 
nextSlot, actorGateway, sharingGroupAssignment);
                                allocatedSlots.add(slot);
                                return slot;
                        }
@@ -295,13 +301,11 @@ public class Instance {
         * @param slot The slot to return.
         * @return True, if the slot was returned, false if not.
         */
+       @Override
        public boolean returnAllocatedSlot(Slot slot) {
-               if (slot == null || slot.getInstance() != this) {
-                       throw new IllegalArgumentException("Slot is null or 
belongs to the wrong TaskManager.");
-               }
-               if (slot.isAlive()) {
-                       throw new IllegalArgumentException("Slot is still 
alive");
-               }
+               checkNotNull(slot);
+               checkArgument(!slot.isAlive(), "slot is still alive");
+               checkArgument(slot.getOwner() == this, "slot belongs to the 
wrong TaskManager.");
 
                if (slot.markReleased()) {
                        LOG.debug("Return allocated slot {}.", slot);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index ef62910..7f05604 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.api.common.JobID;
 
+import javax.annotation.Nullable;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * This class represents a shared slot. A shared slot can have multiple
  * {@link SimpleSlot} instances within itself. This allows to
@@ -35,7 +40,7 @@ import java.util.Set;
  * <p><b>IMPORTANT:</b> This class contains no synchronization. Thus, the 
caller has to guarantee proper
  * synchronization. In the current implementation, all concurrently modifying 
operations are
  * passed through a {@link SlotSharingGroupAssignment} object which is 
responsible for
- * synchronization.</p>
+ * synchronization.
  */
 public class SharedSlot extends Slot {
 
@@ -51,12 +56,18 @@ public class SharedSlot extends Slot {
         * This constructor is used to create a slot directly from an instance. 
         * 
         * @param jobID The ID of the job that the slot is created for.
-        * @param instance The instance that holds the slot.
+        * @param owner The component from which this slot is allocated.
+        * @param location The location info of the TaskManager where the slot 
was allocated from
         * @param slotNumber The number of the slot.
+        * @param taskManagerActorGateway The actor gateway to communicate with 
the TaskManager   
         * @param assignmentGroup The assignment group that this shared slot 
belongs to.
         */
-       public SharedSlot(JobID jobID, Instance instance, int slotNumber, 
SlotSharingGroupAssignment assignmentGroup) {
-               this(jobID, instance, slotNumber, assignmentGroup, null, null);
+       public SharedSlot(
+                       JobID jobID, SlotOwner owner, TaskManagerLocation 
location, int slotNumber,
+                       ActorGateway taskManagerActorGateway,
+                       SlotSharingGroupAssignment assignmentGroup) {
+
+               this(jobID, owner, location, slotNumber, 
taskManagerActorGateway, assignmentGroup, null, null);
        }
 
        /**
@@ -64,15 +75,23 @@ public class SharedSlot extends Slot {
         * to the given task group.
         * 
         * @param jobID The ID of the job that the slot is created for.
-        * @param instance The instance that holds the slot.
+        * @param owner The component from which this slot is allocated.
+        * @param location The location info of the TaskManager where the slot 
was allocated from
         * @param slotNumber The number of the slot.
+        * @param taskManagerActorGateway The actor gateway to communicate with 
the TaskManager   
         * @param assignmentGroup The assignment group that this shared slot 
belongs to.
+        * @param parent The parent slot of this slot.
+        * @param groupId The assignment group of this slot.
         */
-       public SharedSlot(JobID jobID, Instance instance, int slotNumber,
-                                               SlotSharingGroupAssignment 
assignmentGroup, SharedSlot parent, AbstractID groupId) {
-               super(jobID, instance, slotNumber, parent, groupId);
+       public SharedSlot(
+                       JobID jobID, SlotOwner owner, TaskManagerLocation 
location, int slotNumber,
+                       ActorGateway taskManagerActorGateway,
+                       SlotSharingGroupAssignment assignmentGroup,
+                       @Nullable SharedSlot parent, @Nullable AbstractID 
groupId) {
+
+               super(jobID, owner, location, slotNumber, 
taskManagerActorGateway, parent, groupId);
 
-               this.assignmentGroup = assignmentGroup;
+               this.assignmentGroup = checkNotNull(assignmentGroup);
                this.subSlots = new HashSet<Slot>();
        }
 
@@ -148,7 +167,9 @@ public class SharedSlot extends Slot {
         */
        SimpleSlot allocateSubSlot(AbstractID groupId) {
                if (isAlive()) {
-                       SimpleSlot slot = new SimpleSlot(getJobID(), 
getInstance(), subSlots.size(), this, groupId);
+                       SimpleSlot slot = new SimpleSlot(
+                                       getJobID(), getOwner(), 
getTaskManagerLocation(), subSlots.size(), 
+                                       getTaskManagerActorGateway(), this, 
groupId);
                        subSlots.add(slot);
                        return slot;
                }
@@ -168,7 +189,9 @@ public class SharedSlot extends Slot {
         */
        SharedSlot allocateSharedSlot(AbstractID groupId){
                if (isAlive()) {
-                       SharedSlot slot = new SharedSlot(getJobID(), 
getInstance(), subSlots.size(), assignmentGroup, this, groupId);
+                       SharedSlot slot = new SharedSlot(
+                                       getJobID(), getOwner(), 
getTaskManagerLocation(), subSlots.size(), 
+                                       getTaskManagerActorGateway(), 
assignmentGroup, this, groupId);
                        subSlots.add(slot);
                        return slot;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index dbe961a..615138f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -21,15 +21,18 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
  * A SimpleSlot represents a single slot on a TaskManager instance, or a slot 
within a shared slot.
  *
  * <p>If this slot is part of a {@link SharedSlot}, then the parent attribute 
will point to that shared slot.
- * If not, then the parent attribute is null.</p>
+ * If not, then the parent attribute is null.
  */
 public class SimpleSlot extends Slot {
 
@@ -43,18 +46,22 @@ public class SimpleSlot extends Slot {
        private volatile Execution executedTask;
 
        /** The locality attached to the slot, defining whether the slot was 
allocated at the desired location. */
-       private Locality locality = Locality.UNCONSTRAINED;
+       private volatile Locality locality = Locality.UNCONSTRAINED;
 
 
        /**
         * Creates a new simple slot that stands alone and does not belong to 
shared slot.
         * 
         * @param jobID The ID of the job that the slot is allocated for.
-        * @param instance The instance that the slot belongs to.
+        * @param owner The component from which this slot is allocated.
+        * @param location The location info of the TaskManager where the slot 
was allocated from
         * @param slotNumber The number of the task slot on the instance.
+        * @param taskManagerActorGateway The actor gateway to communicate with 
the TaskManager of this slot   
         */
-       public SimpleSlot(JobID jobID, Instance instance, int slotNumber) {
-               super(jobID, instance, slotNumber, null, null);
+       public SimpleSlot(
+                       JobID jobID, SlotOwner owner, TaskManagerLocation 
location, int slotNumber,
+                       ActorGateway taskManagerActorGateway) {
+               this(jobID, owner, location, slotNumber, 
taskManagerActorGateway, null, null);
        }
 
        /**
@@ -62,13 +69,18 @@ public class SimpleSlot extends Slot {
         * is identified by the given ID..
         *
         * @param jobID The ID of the job that the slot is allocated for.
-        * @param instance The instance that the slot belongs to.
+        * @param owner The component from which this slot is allocated.
+        * @param location The location info of the TaskManager where the slot 
was allocated from
         * @param slotNumber The number of the simple slot in its parent shared 
slot.
         * @param parent The parent shared slot.
         * @param groupID The ID that identifies the group that the slot 
belongs to.
         */
-       public SimpleSlot(JobID jobID, Instance instance, int slotNumber, 
SharedSlot parent, AbstractID groupID) {
-               super(jobID, instance, slotNumber, parent, groupID);
+       public SimpleSlot(
+                       JobID jobID, SlotOwner owner, TaskManagerLocation 
location, int slotNumber,
+                       ActorGateway taskManagerActorGateway,
+                       @Nullable SharedSlot parent, @Nullable AbstractID 
groupID) {
+
+               super(jobID, owner, location, slotNumber, 
taskManagerActorGateway, parent, groupID);
        }
 
        // 
------------------------------------------------------------------------
@@ -142,15 +154,12 @@ public class SimpleSlot extends Slot {
 
        @Override
        public void releaseSlot() {
-
                if (!isCanceled()) {
 
                        // kill all tasks currently running in this slot
                        Execution exec = this.executedTask;
                        if (exec != null && !exec.isFinished()) {
-                               exec.fail(new Exception(
-                                               "The slot in which the task was 
executed has been released. Probably loss of TaskManager "
-                                                               + 
getInstance()));
+                               exec.fail(new Exception("TaskManager was 
lost/killed: " + getTaskManagerLocation()));
                        }
 
                        // release directly (if we are directly allocated),
@@ -158,7 +167,7 @@ public class SimpleSlot extends Slot {
                        if (getParent() == null) {
                                // we have to give back the slot to the owning 
instance
                                if (markCancelled()) {
-                                       getInstance().returnAllocatedSlot(this);
+                                       getOwner().returnAllocatedSlot(this);
                                }
                        } else {
                                // we have to ask our parent to dispose us

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 341ef95..451a9ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,11 +18,18 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.api.common.JobID;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Base class for task slots. TaskManagers offer one or more task slots, which 
define a slice of 
  * their resources.
@@ -30,7 +37,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). 
In the more complex
  * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. 
Shared slots may contain
  * other shared slots which in turn can hold simple slots. That way, a shared 
slot may define a tree
- * of slots that belong to it.</p>
+ * of slots that belong to it.
  */
 public abstract class Slot {
 
@@ -52,15 +59,23 @@ public abstract class Slot {
        /** The ID of the job this slice belongs to. */
        private final JobID jobID;
 
-       /** The id of the group that this slot is allocated to. May be null. */
-       private final AbstractID groupID;
+       /** The location information of the TaskManager to which this slot 
belongs */
+       private final TaskManagerLocation taskManagerLocation;
+
+       /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
+       private final ActorGateway taskManagerActorGateway;
 
-       /** The instance on which the slot is allocated */
-       private final Instance instance;
+       /** The owner of this slot - the slot was taken from that owner and 
must be disposed to it */
+       private final SlotOwner owner;
 
        /** The parent of this slot in the hierarchy, or null, if this is the 
parent */
+       @Nullable
        private final SharedSlot parent;
 
+       /** The id of the group that this slot is allocated to. May be null. */
+       @Nullable
+       private final AbstractID groupID;
+
        /** The number of the slot on which the task is deployed */
        private final int slotNumber;
 
@@ -71,23 +86,28 @@ public abstract class Slot {
         * Base constructor for slots.
         * 
         * @param jobID The ID of the job that this slot is allocated for.
-        * @param instance The instance from which this slot is allocated.
+        * @param owner The component from which this slot is allocated.
+        * @param location The location info of the TaskManager where the slot 
was allocated from
         * @param slotNumber The number of this slot.
+        * @param taskManagerActorGateway The actor gateway to communicate with 
the TaskManager
         * @param parent The parent slot that contains this slot. May be null, 
if this slot is the root.
         * @param groupID The ID that identifies the task group for which this 
slot is allocated. May be null
         *                if the slot does not belong to any task group.   
         */
-       protected Slot(JobID jobID, Instance instance, int slotNumber, 
SharedSlot parent, AbstractID groupID) {
-               if (jobID == null || instance == null || slotNumber < 0) {
-                       throw new IllegalArgumentException();
-               }
-
-               this.jobID = jobID;
-               this.instance = instance;
+       protected Slot(
+                       JobID jobID, SlotOwner owner, TaskManagerLocation 
location, int slotNumber,
+                       ActorGateway taskManagerActorGateway,
+                       @Nullable SharedSlot parent, @Nullable AbstractID 
groupID) {
+
+               checkArgument(slotNumber >= 0);
+
+               this.jobID = checkNotNull(jobID);
+               this.taskManagerLocation = checkNotNull(location);
+               this.owner = checkNotNull(owner);
+               this.taskManagerActorGateway = 
checkNotNull(taskManagerActorGateway);
+               this.parent = parent; // may be null
+               this.groupID = groupID; // may be null
                this.slotNumber = slotNumber;
-               this.parent = parent;
-               this.groupID = groupID;
-
        }
        // 
--------------------------------------------------------------------------------------------
 
@@ -101,12 +121,42 @@ public abstract class Slot {
        }
 
        /**
-        * Gets the instance from which the slot was allocated.
+        * Gets the ID of the TaskManager that offers this slot.
+        *
+        * @return The ID of the TaskManager that offers this slot
+        */
+       public ResourceID getTaskManagerID() {
+               return taskManagerLocation.getResourceID();
+       }
+
+       /**
+        * Gets the location info of the TaskManager that offers this slot.
         *
-        * @return The instance from which the slot was allocated.
+        * @return The location info of the TaskManager that offers this slot
         */
-       public Instance getInstance() {
-               return instance;
+       public TaskManagerLocation getTaskManagerLocation() {
+               return taskManagerLocation;
+       }
+
+       /**
+        * Gets the actor gateway that can be used to send messages to the 
TaskManager.
+        *
+        * <p>This method should be removed once the new interface-based RPC 
abstraction is in place
+        *
+        * @return The actor gateway that can be used to send messages to the 
TaskManager.
+        */
+       public ActorGateway getTaskManagerActorGateway() {
+               return taskManagerActorGateway;
+       }
+
+       /**
+        * Gets the owner of this slot. The owner is the component that the 
slot was created from
+        * and to which it needs to be returned after the executed tasks are 
done.
+        * 
+        * @return The owner of this slot.
+        */
+       public SlotOwner getOwner() {
+               return owner;
        }
 
        /**
@@ -149,6 +199,7 @@ public abstract class Slot {
         * 
         * @return The ID identifying the logical group of slots.
         */
+       @Nullable
        public AbstractID getGroupID() {
                return groupID;
        }
@@ -158,10 +209,18 @@ public abstract class Slot {
         * 
         * @return The parent slot, or null, if no this slot has no parent.
         */
+       @Nullable
        public SharedSlot getParent() {
                return parent;
        }
 
+       /**
+        * Gets the root slot of the tree containing this slot. If this slot is 
the root,
+        * the method returns this slot directly, otherwise it recursively goes 
to the parent until
+        * it reaches the root.
+        * 
+        * @return The root slot of the tree containing this slot
+        */
        public Slot getRoot() {
                if (parent == null) {
                        return this;
@@ -244,11 +303,11 @@ public abstract class Slot {
 
        @Override
        public String toString() {
-               return hierarchy() + " - " + instance + " - " + 
getStateName(status);
+               return hierarchy() + " - " + taskManagerLocation + " - " + 
getStateName(status);
        }
 
        protected String hierarchy() {
-               return (getParent() != null ? getParent().hierarchy() : "") + 
"(" + slotNumber + ")";
+               return (getParent() != null ? getParent().hierarchy() : "") + 
'(' + slotNumber + ')';
        }
 
        private static String getStateName(int state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 7d666fe..346cc77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -28,13 +28,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,9 +95,8 @@ public class SlotSharingGroupAssignment {
        /** All slots currently allocated to this sharing group */
        private final Set<SharedSlot> allSlots = new 
LinkedHashSet<SharedSlot>();
 
-       /** The slots available per vertex type (jid), keyed by instance, to 
make them locatable */
-       private final Map<AbstractID, Map<Instance, List<SharedSlot>>> 
availableSlotsPerJid = 
-                       new LinkedHashMap<AbstractID, Map<Instance, 
List<SharedSlot>>>();
+       /** The slots available per vertex type (JobVertexId), keyed by 
TaskManager, to make them locatable */
+       private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> 
availableSlotsPerJid = new LinkedHashMap<>();
 
 
        // 
--------------------------------------------------------------------------------------------
@@ -122,7 +123,7 @@ public class SlotSharingGroupAssignment {
         */
        public int getNumberOfAvailableSlotsForGroup(AbstractID groupId) {
                synchronized (lock) {
-                       Map<Instance, List<SharedSlot>> available = 
availableSlotsPerJid.get(groupId);
+                       Map<ResourceID, List<SharedSlot>> available = 
availableSlotsPerJid.get(groupId);
 
                        if (available != null) {
                                Set<SharedSlot> set = new HashSet<SharedSlot>();
@@ -148,37 +149,25 @@ public class SlotSharingGroupAssignment {
        //  Slot allocation
        // 
------------------------------------------------------------------------
 
-       /**
-        * 
-        * @param sharedSlot
-        * @param locality
-        * @param groupId
-        * @return
-        */
        public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot 
sharedSlot, Locality locality, JobVertexID groupId) {
                return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, 
groupId, null);
        }
 
-       /**
-        * 
-        * @param sharedSlot
-        * @param locality
-        * @param constraint
-        * @return
-        */
-       public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot 
sharedSlot, Locality locality,
-                                                                               
                                CoLocationConstraint constraint) {
+       public SimpleSlot addSharedSlotAndAllocateSubSlot(
+                       SharedSlot sharedSlot, Locality locality, 
CoLocationConstraint constraint)
+       {
                return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, 
null, constraint);
        }
-       
-       private SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot 
sharedSlot, Locality locality,
-                                                                               
                        JobVertexID groupId, CoLocationConstraint constraint) {
+
+       private SimpleSlot addSharedSlotAndAllocateSubSlot(
+                       SharedSlot sharedSlot, Locality locality, JobVertexID 
groupId, CoLocationConstraint constraint) {
+
                // sanity checks
                if (!sharedSlot.isRootAndEmpty()) {
                        throw new IllegalArgumentException("The given slot is 
not an empty root slot.");
                }
-               
-               final Instance location = sharedSlot.getInstance();
+
+               final ResourceID location = sharedSlot.getTaskManagerID();
 
                synchronized (lock) {
                        // early out in case that the slot died (instance 
disappeared)
@@ -244,20 +233,20 @@ public class SlotSharingGroupAssignment {
                                // can place a task into this slot.
                                boolean entryForNewJidExists = false;
                                
-                               for (Map.Entry<AbstractID, Map<Instance, 
List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
+                               for (Map.Entry<AbstractID, Map<ResourceID, 
List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
                                        // there is already an entry for this 
groupID
                                        if 
(entry.getKey().equals(groupIdForMap)) {
                                                entryForNewJidExists = true;
                                                continue;
                                        }
 
-                                       Map<Instance, List<SharedSlot>> 
available = entry.getValue();
+                                       Map<ResourceID, List<SharedSlot>> 
available = entry.getValue();
                                        putIntoMultiMap(available, location, 
sharedSlot);
                                }
 
                                // make sure an empty entry exists for this 
group, if no other entry exists
                                if (!entryForNewJidExists) {
-                                       availableSlotsPerJid.put(groupIdForMap, 
new LinkedHashMap<Instance, List<SharedSlot>>());
+                                       availableSlotsPerJid.put(groupIdForMap, 
new LinkedHashMap<ResourceID, List<SharedSlot>>());
                                }
 
                                return subSlot;
@@ -287,18 +276,15 @@ public class SlotSharingGroupAssignment {
 
        /**
         * 
-        * @param vertexID
-        * @param locationPreferences
-        * @return
         */
-       SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<Instance> 
locationPreferences) {
+       SimpleSlot getSlotForTask(JobVertexID vertexID, 
Iterable<TaskManagerLocation> locationPreferences) {
                synchronized (lock) {
-                       Pair<SharedSlot, Locality> p = 
getSlotForTaskInternal(vertexID, locationPreferences, false);
+                       Tuple2<SharedSlot, Locality> p = 
getSlotForTaskInternal(vertexID, locationPreferences, false);
 
                        if (p != null) {
-                               SharedSlot ss = p.getLeft();
+                               SharedSlot ss = p.f0;
                                SimpleSlot slot = ss.allocateSubSlot(vertexID);
-                               slot.setLocality(p.getRight());
+                               slot.setLocality(p.f1);
                                return slot;
                        }
                        else {
@@ -330,7 +316,7 @@ public class SlotSharingGroupAssignment {
                return getSlotForTask(constraint, 
vertex.getPreferredLocations());
        }
        
-       SimpleSlot getSlotForTask(CoLocationConstraint constraint, 
Iterable<Instance> locationPreferences) {
+       SimpleSlot getSlotForTask(CoLocationConstraint constraint, 
Iterable<TaskManagerLocation> locationPreferences) {
                synchronized (lock) {
                        if (constraint.isAssignedAndAlive()) {
                                // the shared slot of the co-location group is 
initialized and set we allocate a sub-slot
@@ -346,15 +332,16 @@ public class SlotSharingGroupAssignment {
                                if (previous == null) {
                                        throw new IllegalStateException("Bug: 
Found assigned co-location constraint without a slot.");
                                }
-                               
-                               Instance location = previous.getInstance();
-                               Pair<SharedSlot, Locality> p = 
getSlotForTaskInternal(constraint.getGroupId(),
-                                                                               
                                                                
Collections.singleton(location), true);
+
+                               TaskManagerLocation location = 
previous.getTaskManagerLocation();
+                               Tuple2<SharedSlot, Locality> p = 
getSlotForTaskInternal(
+                                               constraint.getGroupId(), 
Collections.singleton(location), true);
+
                                if (p == null) {
                                        return null;
                                }
                                else {
-                                       SharedSlot newSharedSlot = p.getLeft();
+                                       SharedSlot newSharedSlot = p.f0;
 
                                        // allocate the co-location group slot 
inside the shared slot
                                        SharedSlot constraintGroupSlot = 
newSharedSlot.allocateSharedSlot(constraint.getGroupId());
@@ -377,15 +364,15 @@ public class SlotSharingGroupAssignment {
                                // the location constraint has not been 
associated with a shared slot, yet.
                                // grab a new slot and initialize the 
constraint with that one.
                                // preferred locations are defined by the vertex
-                               Pair<SharedSlot, Locality> p =
+                               Tuple2<SharedSlot, Locality> p =
                                                
getSlotForTaskInternal(constraint.getGroupId(), locationPreferences, false);
                                if (p == null) {
                                        // could not get a shared slot for this 
co-location-group
                                        return null;
                                }
                                else {
-                                       final SharedSlot availableShared = 
p.getLeft();
-                                       final Locality l = p.getRight();
+                                       final SharedSlot availableShared = p.f0;
+                                       final Locality l = p.f1;
 
                                        // allocate the co-location group slot 
inside the shared slot
                                        SharedSlot constraintGroupSlot = 
availableShared.allocateSharedSlot(constraint.getGroupId());
@@ -405,9 +392,8 @@ public class SlotSharingGroupAssignment {
        }
 
 
-       private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID 
groupId,
-                                                                               
                                                Iterable<Instance> 
preferredLocations,
-                                                                               
                                                boolean localOnly)
+       private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(
+                       AbstractID groupId, Iterable<TaskManagerLocation> 
preferredLocations, boolean localOnly)
        {
                // check if there is anything at all in this group assignment
                if (allSlots.isEmpty()) {
@@ -415,15 +401,15 @@ public class SlotSharingGroupAssignment {
                }
 
                // get the available slots for the group
-               Map<Instance, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
+               Map<ResourceID, List<SharedSlot>> slotsForGroup = 
availableSlotsPerJid.get(groupId);
                
                if (slotsForGroup == null) {
                        // we have a new group, so all slots are available
-                       slotsForGroup = new LinkedHashMap<Instance, 
List<SharedSlot>>();
+                       slotsForGroup = new LinkedHashMap<>();
                        availableSlotsPerJid.put(groupId, slotsForGroup);
 
                        for (SharedSlot availableSlot : allSlots) {
-                               putIntoMultiMap(slotsForGroup, 
availableSlot.getInstance(), availableSlot);
+                               putIntoMultiMap(slotsForGroup, 
availableSlot.getTaskManagerID(), availableSlot);
                        }
                }
                else if (slotsForGroup.isEmpty()) {
@@ -435,15 +421,15 @@ public class SlotSharingGroupAssignment {
                boolean didNotGetPreferred = false;
 
                if (preferredLocations != null) {
-                       for (Instance location : preferredLocations) {
+                       for (TaskManagerLocation location : preferredLocations) 
{
 
                                // set the flag that we failed a preferred 
location. If one will be found,
                                // we return early anyways and skip the flag 
evaluation
                                didNotGetPreferred = true;
 
-                               SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location);
+                               SharedSlot slot = 
removeFromMultiMap(slotsForGroup, location.getResourceID());
                                if (slot != null && slot.isAlive()) {
-                                       return new ImmutablePair<SharedSlot, 
Locality>(slot, Locality.LOCAL);
+                                       return new Tuple2<>(slot, 
Locality.LOCAL);
                                }
                        }
                }
@@ -459,7 +445,7 @@ public class SlotSharingGroupAssignment {
                SharedSlot slot;
                while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
                        if (slot.isAlive()) {
-                               return new ImmutablePair<SharedSlot, 
Locality>(slot, locality);
+                               return new Tuple2<>(slot, locality);
                        }
                }
                
@@ -510,7 +496,7 @@ public class SlotSharingGroupAssignment {
                                                        // for that group 
again. otherwise, the slot is part of a
                                                        // co-location group 
and nothing becomes immediately available
 
-                                                       Map<Instance, 
List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
+                                                       Map<ResourceID, 
List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
 
                                                        // sanity check
                                                        if (slotsForJid == 
null) {
@@ -518,7 +504,7 @@ public class SlotSharingGroupAssignment {
                                                                                
" when available slots indicated that all slots were available.");
                                                        }
 
-                                                       
putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
+                                                       
putIntoMultiMap(slotsForJid, parent.getTaskManagerID(), parent);
                                                }
                                        } else {
                                                // the parent shared slot is 
now empty and can be released
@@ -558,8 +544,6 @@ public class SlotSharingGroupAssignment {
        /**
         * 
         * <p><b>NOTE: This method must be called from within a scope that 
holds the lock.</b></p>
-        * 
-        * @param sharedSlot
         */
        private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) {
                // sanity check
@@ -576,7 +560,7 @@ public class SlotSharingGroupAssignment {
                
                if (parent == null) {
                        // root slot, return to the instance.
-                       
sharedSlot.getInstance().returnAllocatedSlot(sharedSlot);
+                       sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
                        
                        // also, make sure we remove this slot from everywhere
                        allSlots.remove(sharedSlot);
@@ -592,7 +576,7 @@ public class SlotSharingGroupAssignment {
                                
                                if (parentRemaining > 0) {
                                        // the parent becomes available for the 
group again
-                                       Map<Instance, List<SharedSlot>> 
slotsForGroup = availableSlotsPerJid.get(groupID);
+                                       Map<ResourceID, List<SharedSlot>> 
slotsForGroup = availableSlotsPerJid.get(groupID);
 
                                        // sanity check
                                        if (slotsForGroup == null) {
@@ -600,7 +584,7 @@ public class SlotSharingGroupAssignment {
                                                                " when 
available slots indicated that all slots were available.");
                                        }
 
-                                       putIntoMultiMap(slotsForGroup, 
parent.getInstance(), parent);
+                                       putIntoMultiMap(slotsForGroup, 
parent.getTaskManagerID(), parent);
                                        
                                }
                                else {
@@ -620,7 +604,7 @@ public class SlotSharingGroupAssignment {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static void putIntoMultiMap(Map<Instance, List<SharedSlot>> 
map, Instance location, SharedSlot slot) {
+       private static void putIntoMultiMap(Map<ResourceID, List<SharedSlot>> 
map, ResourceID location, SharedSlot slot) {
                List<SharedSlot> slotsForInstance = map.get(location);
                if (slotsForInstance == null) {
                        slotsForInstance = new ArrayList<SharedSlot>();
@@ -629,7 +613,7 @@ public class SlotSharingGroupAssignment {
                slotsForInstance.add(slot);
        }
        
-       private static SharedSlot removeFromMultiMap(Map<Instance, 
List<SharedSlot>> map, Instance location) {
+       private static SharedSlot removeFromMultiMap(Map<ResourceID, 
List<SharedSlot>> map, ResourceID location) {
                List<SharedSlot> slotsForLocation = map.get(location);
                
                if (slotsForLocation == null) {
@@ -645,8 +629,8 @@ public class SlotSharingGroupAssignment {
                }
        }
        
-       private static SharedSlot pollFromMultiMap(Map<Instance, 
List<SharedSlot>> map) {
-               Iterator<Map.Entry<Instance, List<SharedSlot>>> iter = 
map.entrySet().iterator();
+       private static SharedSlot pollFromMultiMap(Map<ResourceID, 
List<SharedSlot>> map) {
+               Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = 
map.entrySet().iterator();
                
                while (iter.hasNext()) {
                        List<SharedSlot> slots = iter.next().getValue();
@@ -667,19 +651,19 @@ public class SlotSharingGroupAssignment {
                return null;
        }
        
-       private static void removeSlotFromAllEntries(Map<AbstractID, 
Map<Instance, List<SharedSlot>>> availableSlots, 
-                                                                               
                        SharedSlot slot)
+       private static void removeSlotFromAllEntries(
+                       Map<AbstractID, Map<ResourceID, List<SharedSlot>>> 
availableSlots, SharedSlot slot)
        {
-               final Instance instance = slot.getInstance();
+               final ResourceID taskManagerId = slot.getTaskManagerID();
                
-               for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> 
entry : availableSlots.entrySet()) {
-                       Map<Instance, List<SharedSlot>> map = entry.getValue();
+               for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> 
entry : availableSlots.entrySet()) {
+                       Map<ResourceID, List<SharedSlot>> map = 
entry.getValue();
 
-                       List<SharedSlot> list = map.get(instance);
+                       List<SharedSlot> list = map.get(taskManagerId);
                        if (list != null) {
                                list.remove(slot);
                                if (list.isEmpty()) {
-                                       map.remove(instance);
+                                       map.remove(taskManagerId);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index fece894..c41f7bf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,34 +18,39 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.instance.Instance;
 
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.runtime.instance.SharedSlot;
 
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A CoLocationConstraint manages the location of a set of tasks
  * (Execution Vertices). In co-location groups, the different subtasks of
  * different JobVertices need to be executed on the same {@link Instance}.
  * This is realized by creating a special shared slot that holds these tasks.
  * 
- * <p>This class tracks the location and the shared slot for this set of 
tasks.</p>
+ * <p>This class tracks the location and the shared slot for this set of tasks.
  */
 public class CoLocationConstraint {
-       
+
        private final CoLocationGroup group;
-       
+
        private volatile SharedSlot sharedSlot;
-       
-       private volatile boolean locationLocked;
-       
-       
+
+       private volatile ResourceID lockedLocation;
+
+
        CoLocationConstraint(CoLocationGroup group) {
                Preconditions.checkNotNull(group);
                this.group = group;
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Status & Properties
        // 
------------------------------------------------------------------------
@@ -77,7 +82,7 @@ public class CoLocationConstraint {
         * @return True if the location has been assigned, false otherwise.
         */
        public boolean isAssigned() {
-               return locationLocked;
+               return lockedLocation != null;
        }
 
        /**
@@ -89,7 +94,7 @@ public class CoLocationConstraint {
         *         false otherwise.
         */
        public boolean isAssignedAndAlive() {
-               return locationLocked && sharedSlot.isAlive();
+               return lockedLocation != null && sharedSlot.isAlive();
        }
 
        /**
@@ -100,9 +105,9 @@ public class CoLocationConstraint {
         * @return The instance describing the location for the tasks of this 
constraint.
         * @throws IllegalStateException Thrown if the location has not been 
assigned, yet.
         */
-       public Instance getLocation() {
-               if (locationLocked) {
-                       return sharedSlot.getInstance();
+       public TaskManagerLocation getLocation() {
+               if (lockedLocation != null) {
+                       return sharedSlot.getTaskManagerLocation();
                } else {
                        throw new IllegalStateException("Location not yet 
locked");
                }
@@ -125,18 +130,20 @@ public class CoLocationConstraint {
         *                                  the new slot is from a different 
location.
         */
        public void setSharedSlot(SharedSlot newSlot) {
+               checkNotNull(newSlot);
+
                if (this.sharedSlot == null) {
                        this.sharedSlot = newSlot;
                }
                else if (newSlot != this.sharedSlot){
-                       if (locationLocked && this.sharedSlot.getInstance() != 
newSlot.getInstance()) {
+                       if (lockedLocation != null && lockedLocation != 
newSlot.getTaskManagerID()) {
                                throw new IllegalArgumentException(
                                                "Cannot assign different 
location to a constraint whose location is locked.");
                        }
                        if (this.sharedSlot.isAlive()) {
                                this.sharedSlot.releaseSlot();
                        }
-                       
+
                        this.sharedSlot = newSlot;
                }
        }
@@ -149,13 +156,10 @@ public class CoLocationConstraint {
         *                               or is no slot has been set, yet.
         */
        public void lockLocation() throws IllegalStateException {
-               if (locationLocked) {
-                       throw new IllegalStateException("Location is already 
locked");
-               }
-               if (sharedSlot == null) {
-                       throw new IllegalStateException("Cannot lock location 
without a slot.");
-               }
-               locationLocked = true;
+               checkState(lockedLocation == null, "Location is already 
locked");
+               checkState(sharedSlot != null, "Cannot lock location without a 
slot.");
+
+               lockedLocation = sharedSlot.getTaskManagerID();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 963fc4c..b481b55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -37,6 +38,7 @@ import akka.dispatch.Futures;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
@@ -45,6 +47,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -78,7 +81,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        private final HashMap<String, Set<Instance>> allInstancesByHost = new 
HashMap<String, Set<Instance>>();
        
        /** All instances that still have available resources */
-       private final Queue<Instance> instancesWithAvailableResources = new 
SetQueue<Instance>();
+       private final Map<ResourceID, Instance> instancesWithAvailableResources 
= new LinkedHashMap<>();
        
        /** All tasks pending to be scheduled */
        private final Queue<QueuedTask> taskQueue = new 
ArrayDeque<QueuedTask>();
@@ -163,7 +166,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
 
                final ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
                
-               final Iterable<Instance> preferredLocations = 
vertex.getPreferredLocations();
+               final Iterable<TaskManagerLocation> preferredLocations = 
vertex.getPreferredLocations();
                final boolean forceExternalLocation = 
vertex.isScheduleLocalOnly() &&
                                                                        
preferredLocations != null && preferredLocations.iterator().hasNext();
        
@@ -222,7 +225,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                        
                                        // our location preference is either 
determined by the location constraint, or by the
                                        // vertex's preferred locations
-                                       final Iterable<Instance> locations;
+                                       final Iterable<TaskManagerLocation> 
locations;
                                        final boolean localOnly;
                                        if (constraint != null && 
constraint.isAssigned()) {
                                                locations = 
Collections.singleton(constraint.getLocation());
@@ -341,7 +344,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         * @return The instance to run the vertex on, it {@code null}, if no 
instance is available.
         */
        protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
-                                                                               
        Iterable<Instance> requestedLocations,
+                                                                               
        Iterable<TaskManagerLocation> requestedLocations,
                                                                                
        boolean localOnly) {
                // we need potentially to loop multiple times, because there 
may be false positives
                // in the set-with-available-instances
@@ -360,7 +363,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                
                                // if the instance has further available slots, 
re-add it to the set of available resources.
                                if (instanceToUse.hasResourcesAvailable()) {
-                                       
this.instancesWithAvailableResources.add(instanceToUse);
+                                       
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), 
instanceToUse);
                                }
                                
                                if (slot != null) {
@@ -396,7 +399,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         * @return A sub-slot for the given vertex, or {@code null}, if no slot 
is available.
         */
        protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,
-                                                                               
                        Iterable<Instance> requestedLocations,
+                                                                               
                        Iterable<TaskManagerLocation> requestedLocations,
                                                                                
                        SlotSharingGroupAssignment groupAssignment,
                                                                                
                        CoLocationConstraint constraint,
                                                                                
                        boolean localOnly)
@@ -422,7 +425,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
 
                                // if the instance has further available slots, 
re-add it to the set of available resources.
                                if (instanceToUse.hasResourcesAvailable()) {
-                                       
this.instancesWithAvailableResources.add(instanceToUse);
+                                       
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), 
instanceToUse);
                                }
 
                                if (sharedSlot != null) {
@@ -460,13 +463,13 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
         *                           no locality preference exists.   
         * @param localOnly Flag to indicate whether only one of the exact 
local instances can be chosen.  
         */
-       private Pair<Instance, Locality> findInstance(Iterable<Instance> 
requestedLocations, boolean localOnly){
+       private Pair<Instance, Locality> 
findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean 
localOnly) {
                
                // drain the queue of newly available instances
                while (this.newlyAvailableInstances.size() > 0) {
                        Instance queuedInstance = 
this.newlyAvailableInstances.poll();
                        if (queuedInstance != null) {
-                               
this.instancesWithAvailableResources.add(queuedInstance);
+                               
this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), 
queuedInstance);
                        }
                }
                
@@ -475,15 +478,18 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                        return null;
                }
 
-               Iterator<Instance> locations = requestedLocations == null ? 
null : requestedLocations.iterator();
+               Iterator<TaskManagerLocation> locations = requestedLocations == 
null ? null : requestedLocations.iterator();
 
                if (locations != null && locations.hasNext()) {
                        // we have a locality preference
 
                        while (locations.hasNext()) {
-                               Instance location = locations.next();
-                               if (location != null && 
this.instancesWithAvailableResources.remove(location)) {
-                                       return new ImmutablePair<Instance, 
Locality>(location, Locality.LOCAL);
+                               TaskManagerLocation location = locations.next();
+                               if (location != null) {
+                                       Instance instance = 
instancesWithAvailableResources.remove(location.getResourceID());
+                                       if (instance != null) {
+                                               return new 
ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);
+                                       }
                                }
                        }
                        
@@ -492,14 +498,21 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                return null;
                        }
                        else {
-                               Instance instanceToUse = 
this.instancesWithAvailableResources.poll();
-                               return new ImmutablePair<Instance, 
Locality>(instanceToUse, Locality.NON_LOCAL);
+                               // take the first instance from the instances 
with resources
+                               Iterator<Instance> instances = 
instancesWithAvailableResources.values().iterator();
+                               Instance instanceToUse = instances.next();
+                               instances.remove();
+
+                               return new ImmutablePair<>(instanceToUse, 
Locality.NON_LOCAL);
                        }
                }
                else {
                        // no location preference, so use some instance
-                       Instance instanceToUse = 
this.instancesWithAvailableResources.poll();
-                       return new ImmutablePair<Instance, 
Locality>(instanceToUse, Locality.UNCONSTRAINED);
+                       Iterator<Instance> instances = 
instancesWithAvailableResources.values().iterator();
+                       Instance instanceToUse = instances.next();
+                       instances.remove();
+
+                       return new ImmutablePair<>(instanceToUse, 
Locality.UNCONSTRAINED);
                }
        }
        
@@ -570,7 +583,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                }
                        }
                        else {
-                               
this.instancesWithAvailableResources.add(instance);
+                               
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
                        }
                }
        }
@@ -643,11 +656,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                        
allInstancesByHost.put(instanceHostName, instanceSet);
                                }
                                instanceSet.add(instance);
-                               
-                                       
+
                                // add it to the available resources and let 
potential waiters know
-                               
this.instancesWithAvailableResources.add(instance);
-       
+                               
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+
                                // add all slots as available
                                for (int i = 0; i < 
instance.getNumberOfAvailableSlots(); i++) {
                                        newSlotAvailable(instance);
@@ -681,8 +693,8 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                }
 
                allInstances.remove(instance);
-               instancesWithAvailableResources.remove(instance);
-               
+               
instancesWithAvailableResources.remove(instance.getResourceId());
+
                String instanceHostName = 
instance.getInstanceConnectionInfo().getHostname();
                Set<Instance> instanceSet = 
allInstancesByHost.get(instanceHostName);
                if (instanceSet != null) {
@@ -709,7 +721,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                synchronized (globalLock) {
                        processNewlyAvailableInstances();
 
-                       for (Instance instance : 
instancesWithAvailableResources) {
+                       for (Instance instance : 
instancesWithAvailableResources.values()) {
                                count += instance.getNumberOfAvailableSlots();
                        }
                }
@@ -781,9 +793,9 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                synchronized (globalLock) {
                        Instance instance;
 
-                       while((instance = newlyAvailableInstances.poll()) != 
null){
-                               if(instance.hasResourcesAvailable()){
-                                       
instancesWithAvailableResources.add(instance);
+                       while ((instance = newlyAvailableInstances.poll()) != 
null) {
+                               if (instance.hasResourcesAvailable()) {
+                                       
instancesWithAvailableResources.put(instance.getResourceId(), instance);
                                }
                        }
                }
@@ -794,17 +806,17 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
        //  Utilities
        // 
------------------------------------------------------------------------
 
-       private static String getHostnamesFromInstances(Iterable<Instance> 
instances) {
+       private static String 
getHostnamesFromInstances(Iterable<TaskManagerLocation> locations) {
                StringBuilder bld = new StringBuilder();
 
                boolean successive = false;
-               for (Instance i : instances) {
+               for (TaskManagerLocation loc : locations) {
                        if (successive) {
                                bld.append(", ");
                        } else {
                                successive = true;
                        }
-                       bld.append(i.getInstanceConnectionInfo().getHostname());
+                       bld.append(loc.getHostname());
                }
 
                return bld.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
index 31bd341..36e4072 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
@@ -20,73 +20,125 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-       
+
        private final Object monitor = new Object();
-       
+
        private volatile SimpleSlot slot;
-       
+
        private volatile SlotAllocationFutureAction action;
-       
+
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Creates a future that is uncompleted.
+        */
        public SlotAllocationFuture() {}
-       
+
+       /**
+        * Creates a future that is immediately completed.
+        * 
+        * @param slot The task slot that completes the future.
+        */
        public SlotAllocationFuture(SimpleSlot slot) {
                this.slot = slot;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
-       public SimpleSlot waitTillAllocated() throws InterruptedException {
-               return waitTillAllocated(0);
-       }
-       
-       public SimpleSlot waitTillAllocated(long timeout) throws 
InterruptedException {
+
+       public SimpleSlot waitTillCompleted() throws InterruptedException {
                synchronized (monitor) {
                        while (slot == null) {
-                               monitor.wait(timeout);
+                               monitor.wait();
+                       }
+                       return slot;
+               }
+       }
+
+       public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) 
throws InterruptedException, TimeoutException {
+               checkArgument(timeout >= 0, "timeout may not be negative");
+               checkNotNull(timeUnit, "timeUnit");
+
+               if (timeout == 0) {
+                       return waitTillCompleted();
+               } else {
+                       final long deadline = System.nanoTime() + 
timeUnit.toNanos(timeout);
+                       long millisToWait;
+
+                       synchronized (monitor) {
+                               while (slot == null && (millisToWait = 
(deadline - System.nanoTime()) / 1_000_000) > 0) {
+                                       monitor.wait(millisToWait);
+                               }
+
+                               if (slot != null) {
+                                       return slot;
+                               } else {
+                                       throw new TimeoutException();
+                               }
                        }
-                       
+               }
+       }
+
+       /**
+        * Gets the slot from this future. This method throws an exception, if 
the future has not been completed.
+        * This method never blocks.
+        * 
+        * @return The slot with which this future was completed.
+        * @throws IllegalStateException Thrown, if this method is called 
before the future is completed.
+        */
+       public SimpleSlot get() {
+               final SimpleSlot slot = this.slot;
+               if (slot != null) {
                        return slot;
+               } else {
+                       throw new IllegalStateException("The future is not 
complete - not slot available");
                }
        }
-       
+
        public void setFutureAction(SlotAllocationFutureAction action) {
+               checkNotNull(action);
+
                synchronized (monitor) {
-                       if (this.action != null) {
-                               throw new IllegalStateException("Future already 
has an action registered.");
-                       }
-                       
+                       checkState(this.action == null, "Future already has an 
action registered.");
+
                        this.action = action;
-                       
+
                        if (this.slot != null) {
                                action.slotAllocated(this.slot);
                        }
                }
        }
-       
+
+       /**
+        * Completes the future with a slot.
+        */
        public void setSlot(SimpleSlot slot) {
-               if (slot == null) {
-                       throw new NullPointerException();
-               }
-               
+               checkNotNull(slot);
+
                synchronized (monitor) {
-                       if (this.slot != null) {
-                               throw new IllegalStateException("The future has 
already been assigned a slot.");
-                       }
-                       
+                       checkState(this.slot == null, "The future has already 
been assigned a slot.");
+
                        this.slot = slot;
                        monitor.notifyAll();
-                       
+
                        if (action != null) {
                                action.slotAllocated(slot);
                        }
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
        public String toString() {
                return slot == null ? "PENDING" : "DONE";

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
new file mode 100644
index 0000000..ad9c784
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+/**
+ * Interface for components that hold slots and to which slots get released / 
recycled.
+ */
+public interface SlotOwner {
+
+       boolean returnAllocatedSlot(Slot slot);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 5a0faa5..01d0654 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -108,7 +108,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
                }
 
                this.stringRepresentation = String.format(
-                               "TaskManager (%s) @ %s (dataPort=%d)", 
resourceID, fqdnHostName, dataPort);
+                               "%s @ %s (dataPort=%d)", resourceID, 
fqdnHostName, dataPort);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0c62c69..2a0ecc2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -650,7 +650,7 @@ class JobManager(
             val taskId = execution.getVertex.getParallelSubtaskIndex
 
             val host = if (slot != null) {
-              slot.getInstance().getInstanceConnectionInfo.getHostname
+              slot.getTaskManagerLocation().getHostname()
             } else {
               null
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 6a9b490..3947b17 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -249,7 +249,7 @@ trait TestingJobManagerLike extends FlinkActor {
             } else {
               sender ! decorateMessage(
                 WorkingTaskManager(
-                  Some(resource.getInstance().getActorGateway)
+                  Some(resource.getTaskManagerActorGateway())
                 )
               )
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index cf7cf58..d8bd6cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -117,7 +117,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                ActorGateway actorGateway = mock(ActorGateway.class);
 
                when(simpleSlot.isAlive()).thenReturn(true);
-               when(simpleSlot.getInstance()).thenReturn(instance);
+               
when(simpleSlot.getTaskManagerID()).thenReturn(instance.getResourceId());
+               
when(simpleSlot.getTaskManagerLocation()).thenReturn(instance.getInstanceConnectionInfo());
                
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
                when(simpleSlot.getRoot()).thenReturn(rootSlot);
 
@@ -152,6 +153,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                assertNotNull(metric);
                assertTrue(metric instanceof Gauge);
 
+               @SuppressWarnings("unchecked")
                Gauge<Long> restartingTime = (Gauge<Long>) metric;
 
                // check that the restarting time is 0 since it's the initial 
start

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 91472ae..a1f3345 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -92,8 +92,8 @@ public class VertexLocationConstraintTest {
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
                        ExecutionVertex[] vertices = ejv.getTaskVertices();
                        
-                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
-                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(),
 instance2.getInstanceConnectionInfo()));
+                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
                        
                        vertices[0].setScheduleLocalOnly(true);
                        vertices[1].setScheduleLocalOnly(true);
@@ -106,14 +106,14 @@ public class VertexLocationConstraintTest {
                        assertNotNull(slot1);
                        assertNotNull(slot2);
                        
-                       Instance target1 = slot1.getInstance();
-                       Instance target2 = slot2.getInstance();
+                       ResourceID target1 = slot1.getTaskManagerID();
+                       ResourceID target2 = slot2.getTaskManagerID();
                        
                        assertNotNull(target1);
                        assertNotNull(target2);
                        
-                       assertTrue(target1 == instance1 || target1 == 
instance2);
-                       assertTrue(target2 == instance3);
+                       assertTrue(target1 == instance1.getResourceId() || 
target1 == instance2.getResourceId());
+                       assertEquals(target2, instance3.getResourceId());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -165,8 +165,8 @@ public class VertexLocationConstraintTest {
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
                        ExecutionVertex[] vertices = ejv.getTaskVertices();
                        
-                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3));
-                       
vertices[1].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
+                       
vertices[1].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(),
 instance2.getInstanceConnectionInfo()));
                        
                        vertices[0].setScheduleLocalOnly(true);
                        vertices[1].setScheduleLocalOnly(true);
@@ -179,14 +179,11 @@ public class VertexLocationConstraintTest {
                        assertNotNull(slot1);
                        assertNotNull(slot2);
                        
-                       Instance target1 = slot1.getInstance();
-                       Instance target2 = slot2.getInstance();
+                       ResourceID target1 = slot1.getTaskManagerID();
+                       ResourceID target2 = slot2.getTaskManagerID();
                        
-                       assertNotNull(target1);
-                       assertNotNull(target2);
-                       
-                       assertTrue(target1 == instance3);
-                       assertTrue(target2 == instance1 || target2 == 
instance2);
+                       assertTrue(target1 == instance3.getResourceId());
+                       assertTrue(target2 == instance1.getResourceId() || 
target2 == instance2.getResourceId());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -242,8 +239,8 @@ public class VertexLocationConstraintTest {
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
                        ExecutionVertex[] vertices = ejv.getTaskVertices();
                        
-                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
-                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+                       
vertices[0].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(),
 instance2.getInstanceConnectionInfo()));
+                       
vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
                        
                        vertices[0].setScheduleLocalOnly(true);
                        vertices[1].setScheduleLocalOnly(true);
@@ -255,15 +252,12 @@ public class VertexLocationConstraintTest {
                        
                        assertNotNull(slot1);
                        assertNotNull(slot2);
-                       
-                       Instance target1 = slot1.getInstance();
-                       Instance target2 = slot2.getInstance();
-                       
-                       assertNotNull(target1);
-                       assertNotNull(target2);
-                       
-                       assertTrue(target1 == instance1 || target1 == 
instance2);
-                       assertTrue(target2 == instance3);
+
+                       ResourceID target1 = slot1.getTaskManagerID();
+                       ResourceID target2 = slot2.getTaskManagerID();
+
+                       assertTrue(target1 == instance1.getResourceId() || 
target1 == instance2.getResourceId());
+                       assertTrue(target2 == instance3.getResourceId());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -310,7 +304,7 @@ public class VertexLocationConstraintTest {
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex.getID());
                        ExecutionVertex[] vertices = ejv.getTaskVertices();
                        
-                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2.getInstanceConnectionInfo()));
                        vertices[0].setScheduleLocalOnly(true);
                        
                        try {
@@ -380,7 +374,7 @@ public class VertexLocationConstraintTest {
                        ExecutionJobVertex ejv = 
eg.getAllVertices().get(jobVertex1.getID());
                        ExecutionVertex[] vertices = ejv.getTaskVertices();
                        
-                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+                       
vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2.getInstanceConnectionInfo()));
                        vertices[0].setScheduleLocalOnly(true);
                        
                        try {
@@ -420,7 +414,7 @@ public class VertexLocationConstraintTest {
                        ExecutionVertex ev = 
eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
                        
                        Instance instance = 
ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
-                       
ev.setLocationConstraintHosts(Collections.singletonList(instance));
+                       
ev.setLocationConstraintHosts(Collections.singletonList(instance.getInstanceConnectionInfo()));
                        
                        assertNotNull(ev.getPreferredLocations());
                        assertEquals(instance, 
ev.getPreferredLocations().iterator().next());

Reply via email to