[FLINK-4490] [distributed coordination] (part 3) Rename methods on 'Instance' 
to have more intuitive names

getResourceID() --> getTaskManagerID()
getInstanceConnectionInfo() --> getTaskManagerLocation()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eac6088a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eac6088a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eac6088a

Branch: refs/heads/master
Commit: eac6088a75e813a015b778f4cfc4cce0cf2a53ce
Parents: aaa474a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Aug 31 13:59:01 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../handlers/TaskManagersHandler.java           |   2 +-
 ...PartialInputChannelDeploymentDescriptor.java |  21 +-
 .../apache/flink/runtime/instance/Instance.java |  28 +-
 .../flink/runtime/instance/InstanceManager.java |  20 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  18 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 .../ExecutionGraphMetricsTest.java              | 404 ++++++++++---------
 .../executiongraph/ExecutionGraphTestUtils.java |   2 +-
 .../TerminalStateDeadlockTest.java              |   3 +-
 .../runtime/instance/InstanceManagerTest.java   |  31 +-
 .../flink/runtime/instance/InstanceTest.java    |   6 +-
 .../flink/runtime/instance/SharedSlotsTest.java |  24 +-
 .../flink/runtime/instance/SimpleSlotTest.java  |   2 +-
 .../partition/SpilledSubpartitionViewTest.java  |   4 +-
 .../scheduler/CoLocationConstraintTest.java     |   6 +-
 .../ScheduleWithCoLocationHintTest.java         |  22 +-
 .../scheduler/SchedulerSlotSharingTest.java     |  30 +-
 .../scheduler/SchedulerTestUtils.java           |   5 +-
 .../scheduler/SlotAllocationFutureTest.java     |  12 +-
 19 files changed, 319 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b60cd10..b5e9088 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -85,7 +85,7 @@ public class TaskManagersHandler implements RequestHandler {
                                        gen.writeStartObject();
                                        gen.writeStringField("id", 
instance.getId().toString());
                                        gen.writeStringField("path", 
instance.getActorGateway().path());
-                                       gen.writeNumberField("dataPort", 
instance.getInstanceConnectionInfo().dataPort());
+                                       gen.writeNumberField("dataPort", 
instance.getTaskManagerLocation().dataPort());
                                        
gen.writeNumberField("timeSinceLastHeartbeat", instance.getLastHeartBeat());
                                        gen.writeNumberField("slotsNumber", 
instance.getTotalNumberOfSlots());
                                        gen.writeNumberField("freeSlots", 
instance.getNumberOfAvailableSlots());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
index e1391a4..0eac39d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartialInputChannelDeploymentDescriptor.java
@@ -44,7 +44,7 @@ public class PartialInputChannelDeploymentDescriptor {
        private final ResultPartitionID partitionID;
 
        /** The partition connection info. */
-       private final TaskManagerLocation partitionConnectionInfo;
+       private final TaskManagerLocation partitionTaskManagerLocation;
 
        /** The partition connection index. */
        private final int partitionConnectionIndex;
@@ -52,12 +52,12 @@ public class PartialInputChannelDeploymentDescriptor {
        public PartialInputChannelDeploymentDescriptor(
                        IntermediateDataSetID resultId,
                        ResultPartitionID partitionID,
-                       TaskManagerLocation partitionConnectionInfo,
+                       TaskManagerLocation partitionTaskManagerLocation,
                        int partitionConnectionIndex) {
 
                this.resultId = checkNotNull(resultId);
                this.partitionID = checkNotNull(partitionID);
-               this.partitionConnectionInfo = 
checkNotNull(partitionConnectionInfo);
+               this.partitionTaskManagerLocation = 
checkNotNull(partitionTaskManagerLocation);
                this.partitionConnectionIndex = partitionConnectionIndex;
        }
 
@@ -66,23 +66,20 @@ public class PartialInputChannelDeploymentDescriptor {
         *
         * @see InputChannelDeploymentDescriptor
         */
-       public InputChannelDeploymentDescriptor 
createInputChannelDeploymentDescriptor(
-                       Execution consumerExecution) {
+       public InputChannelDeploymentDescriptor 
createInputChannelDeploymentDescriptor(Execution consumerExecution) {
+               checkNotNull(consumerExecution, "consumerExecution");
 
-               checkNotNull(consumerExecution, "Consumer execution null");
-
-               TaskManagerLocation consumerConnectionInfo = 
consumerExecution.getAssignedResourceLocation();
-
-               checkNotNull(consumerConnectionInfo, "Consumer connection info 
null");
+               TaskManagerLocation consumerLocation = 
consumerExecution.getAssignedResourceLocation();
+               checkNotNull(consumerLocation, "Consumer connection info null");
 
                final ResultPartitionLocation partitionLocation;
 
-               if (consumerConnectionInfo.equals(partitionConnectionInfo)) {
+               if (consumerLocation.equals(partitionTaskManagerLocation)) {
                        partitionLocation = 
ResultPartitionLocation.createLocal();
                }
                else {
                        partitionLocation = 
ResultPartitionLocation.createRemote(
-                                       new 
ConnectionID(partitionConnectionInfo, partitionConnectionIndex));
+                                       new 
ConnectionID(partitionTaskManagerLocation, partitionConnectionIndex));
                }
 
                return new InputChannelDeploymentDescriptor(partitionID, 
partitionLocation);

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index fe46895..4a8139b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -52,14 +52,11 @@ public class Instance implements SlotOwner {
        private final ActorGateway actorGateway;
 
        /** The instance connection information for the data transfer. */
-       private final TaskManagerLocation connectionInfo;
+       private final TaskManagerLocation location;
 
        /** A description of the resources of the task manager */
        private final HardwareDescription resources;
 
-       /** The ID identifies the resource the task manager runs on */
-       private final ResourceID resourceId;
-
        /** The ID identifying the taskManager. */
        private final InstanceID instanceId;
 
@@ -90,22 +87,19 @@ public class Instance implements SlotOwner {
         * Constructs an instance reflecting a registered TaskManager.
         *
         * @param actorGateway The actor gateway to communicate with the remote 
instance
-        * @param connectionInfo The remote connection where the task manager 
receives requests.
-        * @param resourceId The resource id which denotes the resource the 
task manager uses.
+        * @param location The remote connection where the task manager 
receives requests.
         * @param id The id under which the taskManager is registered.
         * @param resources The resources available on the machine.
         * @param numberOfSlots The number of task slots offered by this 
taskManager.
         */
        public Instance(
                        ActorGateway actorGateway,
-                       TaskManagerLocation connectionInfo,
-                       ResourceID resourceId,
+                       TaskManagerLocation location,
                        InstanceID id,
                        HardwareDescription resources,
                        int numberOfSlots) {
                this.actorGateway = actorGateway;
-               this.connectionInfo = connectionInfo;
-               this.resourceId = resourceId;
+               this.location = location;
                this.instanceId = id;
                this.resources = resources;
                this.numberOfSlots = numberOfSlots;
@@ -120,8 +114,8 @@ public class Instance implements SlotOwner {
        // Properties
        // 
--------------------------------------------------------------------------------------------
 
-       public ResourceID getResourceId() {
-               return resourceId;
+       public ResourceID getTaskManagerID() {
+               return location.getResourceID();
        }
 
        public InstanceID getId() {
@@ -246,7 +240,7 @@ public class Instance implements SlotOwner {
                                return null;
                        }
                        else {
-                               SimpleSlot slot = new SimpleSlot(jobID, this, 
connectionInfo, nextSlot, actorGateway);
+                               SimpleSlot slot = new SimpleSlot(jobID, this, 
location, nextSlot, actorGateway);
                                allocatedSlots.add(slot);
                                return slot;
                        }
@@ -284,7 +278,7 @@ public class Instance implements SlotOwner {
                        }
                        else {
                                SharedSlot slot = new SharedSlot(
-                                               jobID, this, connectionInfo, 
nextSlot, actorGateway, sharingGroupAssignment);
+                                               jobID, this, location, 
nextSlot, actorGateway, sharingGroupAssignment);
                                allocatedSlots.add(slot);
                                return slot;
                        }
@@ -355,8 +349,8 @@ public class Instance implements SlotOwner {
                return actorGateway;
        }
 
-       public TaskManagerLocation getInstanceConnectionInfo() {
-               return connectionInfo;
+       public TaskManagerLocation getTaskManagerLocation() {
+               return location;
        }
 
        public int getNumberOfAvailableSlots() {
@@ -405,7 +399,7 @@ public class Instance implements SlotOwner {
 
        @Override
        public String toString() {
-               return String.format("%s @ %s - %d slots - URL: %s", 
instanceId, connectionInfo.getHostname(),
+               return String.format("%s @ %s - %d slots - URL: %s", 
instanceId, location.getHostname(),
                                numberOfSlots, (actorGateway != null ? 
actorGateway.path() : "No instance gateway"));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
index e7a4537..0c7e187 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceManager.java
@@ -138,8 +138,7 @@ public class InstanceManager {
         * for the job execution.
         *
         * @param taskManager ActorRef to the TaskManager which wants to be 
registered
-        * @param resourceID The resource id of the TaskManager
-        * @param connectionInfo ConnectionInfo of the TaskManager
+        * @param taskManagerLocation Location info of the TaskManager
         * @param resources Hardware description of the TaskManager
         * @param numberOfSlots Number of available slots on the TaskManager
         * @param leaderSessionID The current leader session ID of the 
JobManager
@@ -147,12 +146,12 @@ public class InstanceManager {
         */
        public InstanceID registerTaskManager(
                        ActorRef taskManager,
-                       ResourceID resourceID,
-                       TaskManagerLocation connectionInfo,
+                       TaskManagerLocation taskManagerLocation,
                        HardwareDescription resources,
                        int numberOfSlots,
-                       UUID leaderSessionID){
-               synchronized(this.lock){
+                       UUID leaderSessionID) {
+               
+               synchronized (this.lock) {
                        if (this.isShutdown) {
                                throw new 
IllegalStateException("InstanceManager is shut down.");
                        }
@@ -174,12 +173,11 @@ public class InstanceManager {
 
                        InstanceID instanceID = new InstanceID();
 
-                       Instance host = new Instance(actorGateway, 
connectionInfo, resourceID, instanceID,
-                               resources, numberOfSlots);
+                       Instance host = new Instance(actorGateway, 
taskManagerLocation, instanceID, resources, numberOfSlots);
 
                        registeredHostsById.put(instanceID, host);
                        registeredHostsByConnection.put(taskManager, host);
-                       registeredHostsByResource.put(resourceID, host);
+                       
registeredHostsByResource.put(taskManagerLocation.getResourceID(), host);
 
                        totalNumberOfAliveTaskSlots += numberOfSlots;
 
@@ -187,7 +185,7 @@ public class InstanceManager {
                                LOG.info(String.format("Registered TaskManager 
at %s (%s) as %s. " +
                                                                "Current number 
of registered hosts is %d. " +
                                                                "Current number 
of alive task slots is %d.",
-                                               connectionInfo.getHostname(),
+                                               
taskManagerLocation.getHostname(),
                                                taskManager.path(),
                                                instanceID,
                                                registeredHostsById.size(),
@@ -217,7 +215,7 @@ public class InstanceManager {
 
                        registeredHostsByConnection.remove(host);
                        registeredHostsById.remove(instance.getId());
-                       
registeredHostsByResource.remove(instance.getResourceId());
+                       
registeredHostsByResource.remove(instance.getTaskManagerID());
 
                        if (terminated) {
                                
deadHosts.add(instance.getActorGateway().actor());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b481b55..734972d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -363,7 +363,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                
                                // if the instance has further available slots, 
re-add it to the set of available resources.
                                if (instanceToUse.hasResourcesAvailable()) {
-                                       
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), 
instanceToUse);
+                                       
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), 
instanceToUse);
                                }
                                
                                if (slot != null) {
@@ -425,7 +425,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
 
                                // if the instance has further available slots, 
re-add it to the set of available resources.
                                if (instanceToUse.hasResourcesAvailable()) {
-                                       
this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), 
instanceToUse);
+                                       
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), 
instanceToUse);
                                }
 
                                if (sharedSlot != null) {
@@ -469,7 +469,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                while (this.newlyAvailableInstances.size() > 0) {
                        Instance queuedInstance = 
this.newlyAvailableInstances.poll();
                        if (queuedInstance != null) {
-                               
this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), 
queuedInstance);
+                               
this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), 
queuedInstance);
                        }
                }
                
@@ -583,7 +583,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                }
                        }
                        else {
-                               
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+                               
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
                        }
                }
        }
@@ -649,7 +649,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                instance.setSlotAvailabilityListener(this);
                                
                                // store the instance in the by-host-lookup
-                               String instanceHostName = 
instance.getInstanceConnectionInfo().getHostname();
+                               String instanceHostName = 
instance.getTaskManagerLocation().getHostname();
                                Set<Instance> instanceSet = 
allInstancesByHost.get(instanceHostName);
                                if (instanceSet == null) {
                                        instanceSet = new HashSet<Instance>();
@@ -658,7 +658,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                                instanceSet.add(instance);
 
                                // add it to the available resources and let 
potential waiters know
-                               
this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+                               
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
 
                                // add all slots as available
                                for (int i = 0; i < 
instance.getNumberOfAvailableSlots(); i++) {
@@ -693,9 +693,9 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
                }
 
                allInstances.remove(instance);
-               
instancesWithAvailableResources.remove(instance.getResourceId());
+               
instancesWithAvailableResources.remove(instance.getTaskManagerID());
 
-               String instanceHostName = 
instance.getInstanceConnectionInfo().getHostname();
+               String instanceHostName = 
instance.getTaskManagerLocation().getHostname();
                Set<Instance> instanceSet = 
allInstancesByHost.get(instanceHostName);
                if (instanceSet != null) {
                        instanceSet.remove(instance);
@@ -795,7 +795,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener {
 
                        while ((instance = newlyAvailableInstances.poll()) != 
null) {
                                if (instance.hasResourcesAvailable()) {
-                                       
instancesWithAvailableResources.put(instance.getResourceId(), instance);
+                                       
instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2a0ecc2..88af604 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -349,7 +349,7 @@ class JobManager(
       currentResourceManager = Option(msg.resourceManager())
 
       val taskManagerResources = 
instanceManager.getAllRegisteredInstances.asScala.map(
-        instance => instance.getResourceId).toList.asJava
+        instance => instance.getTaskManagerID).toList.asJava
 
       // confirm registration and send known task managers with their resource 
ids
       sender ! decorateMessage(new RegisterResourceManagerSuccessful(self, 
taskManagerResources))
@@ -425,7 +425,6 @@ class JobManager(
         try {
           val instanceID = instanceManager.registerTaskManager(
             taskManager,
-            resourceId,
             connectionInfo,
             hardwareInformation,
             numberOfSlots,

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index d8bd6cb..d5520fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
@@ -47,8 +48,10 @@ import 
org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.mockito.Matchers;
+
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future$;
 import scala.concurrent.duration.FiniteDuration;
@@ -60,7 +63,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -76,200 +80,210 @@ public class ExecutionGraphMetricsTest extends TestLogger 
{
         */
        @Test
        public void testExecutionGraphRestartTimeMetric() throws JobException, 
IOException, InterruptedException {
-               // setup execution graph with mocked scheduling logic
-               int parallelism = 1;
-
-               JobVertex jobVertex = new JobVertex("TestVertex");
-               jobVertex.setParallelism(parallelism);
-               jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-               JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
-
-               Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
-               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestingReporter.class.getName());
-
-               Configuration jobConfig = new Configuration();
-
-               FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
-               MetricRegistry metricRegistry = new MetricRegistry(config);
-
-               assertTrue(metricRegistry.getReporters().size() == 1);
-
-               MetricReporter reporter = metricRegistry.getReporters().get(0);
-
-               assertTrue(reporter instanceof TestingReporter);
-
-               TestingReporter testingReporter = (TestingReporter) reporter;
-
-               MetricGroup metricGroup = new 
JobManagerMetricGroup(metricRegistry, "localhost");
-
-               Scheduler scheduler = mock(Scheduler.class);
-
-               SimpleSlot simpleSlot = mock(SimpleSlot.class);
-
-               Instance instance = mock(Instance.class);
-
-               TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
-
-               Slot rootSlot = mock(Slot.class);
-
-               ActorGateway actorGateway = mock(ActorGateway.class);
-
-               when(simpleSlot.isAlive()).thenReturn(true);
-               
when(simpleSlot.getTaskManagerID()).thenReturn(instance.getResourceId());
-               
when(simpleSlot.getTaskManagerLocation()).thenReturn(instance.getInstanceConnectionInfo());
-               
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
-               when(simpleSlot.getRoot()).thenReturn(rootSlot);
-
-               
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
-
-               
when(instance.getInstanceConnectionInfo()).thenReturn(taskManagerLocation);
-               when(instance.getActorGateway()).thenReturn(actorGateway);
-               when(taskManagerLocation.getHostname()).thenReturn("localhost");
-
-               when(rootSlot.getSlotNumber()).thenReturn(0);
-
-               when(actorGateway.ask(Matchers.any(Object.class), 
Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
-
-               TestingRestartStrategy testingRestartStrategy = new 
TestingRestartStrategy();
-
-               ExecutionGraph executionGraph = new ExecutionGraph(
-                       ExecutionContext$.MODULE$.fromExecutor(new 
ForkJoinPool()),
-                       jobGraph.getJobID(),
-                       jobGraph.getName(),
-                       jobConfig,
-                       new SerializedValue<ExecutionConfig>(null),
-                       timeout,
-                       testingRestartStrategy,
-                       Collections.<BlobKey>emptyList(),
-                       Collections.<URL>emptyList(),
-                       getClass().getClassLoader(),
-                       metricGroup);
-
-               // get restarting time metric
-               Metric metric = 
testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
-
-               assertNotNull(metric);
-               assertTrue(metric instanceof Gauge);
-
-               @SuppressWarnings("unchecked")
-               Gauge<Long> restartingTime = (Gauge<Long>) metric;
-
-               // check that the restarting time is 0 since it's the initial 
start
-               assertTrue(0L == restartingTime.getValue());
-
-               
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-               // start execution
-               executionGraph.scheduleForExecution(scheduler);
-
-               assertTrue(0L == restartingTime.getValue());
-
-               List<ExecutionAttemptID> executionIDs = new ArrayList<>();
-
-               for (ExecutionVertex executionVertex: 
executionGraph.getAllExecutionVertices()) {
-                       
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
-               }
-
-               // tell execution graph that the tasks are in state running --> 
job status switches to state running
-               for (ExecutionAttemptID executionID : executionIDs) {
-                       executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
-               }
-
-               assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
-               assertTrue(0L == restartingTime.getValue());
-
-               // fail the job so that it goes into state restarting
-               for (ExecutionAttemptID executionID : executionIDs) {
-                       executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new 
Exception()));
-               }
-
-               assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-
-               long firstRestartingTimestamp = 
executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-
-               // wait some time so that the restarting time gauge shows a 
value different from 0
-               Thread.sleep(50);
-
-               long previousRestartingTime = restartingTime.getValue();
-
-               // check that the restarting time is monotonically increasing
-               for (int i = 0; i < 10; i++) {
-                       long currentRestartingTime = restartingTime.getValue();
-
-                       assertTrue(currentRestartingTime >= 
previousRestartingTime);
-                       previousRestartingTime = currentRestartingTime;
-               }
-
-               // check that we have measured some restarting time
-               assertTrue(previousRestartingTime > 0);
-
-               // restart job
-               testingRestartStrategy.restartExecutionGraph();
-
-               executionIDs.clear();
-
-               for (ExecutionVertex executionVertex: 
executionGraph.getAllExecutionVertices()) {
-                       
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
-               }
-
-               for (ExecutionAttemptID executionID : executionIDs) {
-                       executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
-               }
-
-               assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
-               assertTrue(firstRestartingTimestamp != 0);
-
-               previousRestartingTime = restartingTime.getValue();
-
-               // check that the restarting time does not increase after we've 
reached the running state
-               for (int i = 0; i < 10; i++) {
-                       long currentRestartingTime = restartingTime.getValue();
-
-                       assertTrue(currentRestartingTime == 
previousRestartingTime);
-                       previousRestartingTime = currentRestartingTime;
-               }
-
-               // fail job again
-               for (ExecutionAttemptID executionID : executionIDs) {
-                       executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new 
Exception()));
-               }
-
-               assertEquals(JobStatus.RESTARTING, executionGraph.getState());
-
-               long secondRestartingTimestamp = 
executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
-
-               assertTrue(firstRestartingTimestamp != 
secondRestartingTimestamp);
-
-               Thread.sleep(50);
-
-               previousRestartingTime = restartingTime.getValue();
-
-               // check that the restarting time is increasing again
-               for (int i = 0; i < 10; i++) {
-                       long currentRestartingTime = restartingTime.getValue();
-
-                       assertTrue(currentRestartingTime >= 
previousRestartingTime);
-                       previousRestartingTime = currentRestartingTime;
-               }
-
-               assertTrue(previousRestartingTime > 0);
-
-               // now lets fail the job while it is in restarting and see 
whether the restarting time then stops to increase
-               executionGraph.fail(new Exception());
-
-               assertEquals(JobStatus.FAILED, executionGraph.getState());
-
-               previousRestartingTime = restartingTime.getValue();
-
-               for (int i = 0; i < 10; i++) {
-                       long currentRestartingTime = restartingTime.getValue();
-
-                       assertTrue(currentRestartingTime == 
previousRestartingTime);
-                       previousRestartingTime = currentRestartingTime;
+               final ExecutorService executor = 
Executors.newCachedThreadPool();
+               try {
+                       // setup execution graph with mocked scheduling logic
+                       int parallelism = 1;
+       
+                       JobVertex jobVertex = new JobVertex("TestVertex");
+                       jobVertex.setParallelism(parallelism);
+                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
+       
+                       Configuration config = new Configuration();
+                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
+       
+                       Configuration jobConfig = new Configuration();
+       
+                       FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
+       
+                       MetricRegistry metricRegistry = new 
MetricRegistry(config);
+       
+                       assertTrue(metricRegistry.getReporters().size() == 1);
+       
+                       MetricReporter reporter = 
metricRegistry.getReporters().get(0);
+       
+                       assertTrue(reporter instanceof TestingReporter);
+       
+                       TestingReporter testingReporter = (TestingReporter) 
reporter;
+       
+                       MetricGroup metricGroup = new 
JobManagerMetricGroup(metricRegistry, "localhost");
+       
+                       Scheduler scheduler = mock(Scheduler.class);
+
+                       ResourceID taskManagerId = ResourceID.generate();
+
+                       TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
+                       
when(taskManagerLocation.getResourceID()).thenReturn(taskManagerId);
+                       
when(taskManagerLocation.getHostname()).thenReturn("localhost");
+
+                       ActorGateway actorGateway = mock(ActorGateway.class);
+
+                       Instance instance = mock(Instance.class);
+                       
when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
+                       
when(instance.getTaskManagerID()).thenReturn(taskManagerId);
+                       
when(instance.getActorGateway()).thenReturn(actorGateway);
+
+                       Slot rootSlot = mock(Slot.class);
+
+                       SimpleSlot simpleSlot = mock(SimpleSlot.class);
+                       when(simpleSlot.isAlive()).thenReturn(true);
+                       
when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
+                       
when(simpleSlot.getTaskManagerID()).thenReturn(taskManagerId);
+                       
when(simpleSlot.getTaskManagerActorGateway()).thenReturn(actorGateway);
+                       
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
+                       when(simpleSlot.getRoot()).thenReturn(rootSlot);
+
+                       
when(scheduler.scheduleImmediately(Matchers.any(ScheduledUnit.class))).thenReturn(simpleSlot);
+
+                       
+
+                       when(rootSlot.getSlotNumber()).thenReturn(0);
+
+                       when(actorGateway.ask(Matchers.any(Object.class), 
Matchers.any(FiniteDuration.class))).thenReturn(Future$.MODULE$.<Object>successful(Messages.getAcknowledge()));
+
+                       TestingRestartStrategy testingRestartStrategy = new 
TestingRestartStrategy();
+
+                       ExecutionGraph executionGraph = new ExecutionGraph(
+                               
ExecutionContext$.MODULE$.fromExecutor(executor),
+                               jobGraph.getJobID(),
+                               jobGraph.getName(),
+                               jobConfig,
+                               new SerializedValue<ExecutionConfig>(null),
+                               timeout,
+                               testingRestartStrategy,
+                               Collections.<BlobKey>emptyList(),
+                               Collections.<URL>emptyList(),
+                               getClass().getClassLoader(),
+                               metricGroup);
+       
+                       // get restarting time metric
+                       Metric metric = 
testingReporter.getMetric(ExecutionGraph.RESTARTING_TIME_METRIC_NAME);
+       
+                       assertNotNull(metric);
+                       assertTrue(metric instanceof Gauge);
+       
+                       @SuppressWarnings("unchecked")
+                       Gauge<Long> restartingTime = (Gauge<Long>) metric;
+       
+                       // check that the restarting time is 0 since it's the 
initial start
+                       assertTrue(0L == restartingTime.getValue());
+       
+                       
executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+       
+                       // start execution
+                       executionGraph.scheduleForExecution(scheduler);
+       
+                       assertTrue(0L == restartingTime.getValue());
+       
+                       List<ExecutionAttemptID> executionIDs = new 
ArrayList<>();
+       
+                       for (ExecutionVertex executionVertex: 
executionGraph.getAllExecutionVertices()) {
+                               
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
+                       }
+       
+                       // tell execution graph that the tasks are in state 
running --> job status switches to state running
+                       for (ExecutionAttemptID executionID : executionIDs) {
+                               executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
+                       }
+       
+                       assertEquals(JobStatus.RUNNING, 
executionGraph.getState());
+       
+                       assertTrue(0L == restartingTime.getValue());
+       
+                       // fail the job so that it goes into state restarting
+                       for (ExecutionAttemptID executionID : executionIDs) {
+                               executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new 
Exception()));
+                       }
+       
+                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
+       
+                       long firstRestartingTimestamp = 
executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
+       
+                       // wait some time so that the restarting time gauge 
shows a value different from 0
+                       Thread.sleep(50);
+       
+                       long previousRestartingTime = restartingTime.getValue();
+       
+                       // check that the restarting time is monotonically 
increasing
+                       for (int i = 0; i < 10; i++) {
+                               long currentRestartingTime = 
restartingTime.getValue();
+       
+                               assertTrue(currentRestartingTime >= 
previousRestartingTime);
+                               previousRestartingTime = currentRestartingTime;
+                       }
+       
+                       // check that we have measured some restarting time
+                       assertTrue(previousRestartingTime > 0);
+       
+                       // restart job
+                       testingRestartStrategy.restartExecutionGraph();
+       
+                       executionIDs.clear();
+       
+                       for (ExecutionVertex executionVertex: 
executionGraph.getAllExecutionVertices()) {
+                               
executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
+                       }
+       
+                       for (ExecutionAttemptID executionID : executionIDs) {
+                               executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
+                       }
+       
+                       assertEquals(JobStatus.RUNNING, 
executionGraph.getState());
+       
+                       assertTrue(firstRestartingTimestamp != 0);
+       
+                       previousRestartingTime = restartingTime.getValue();
+       
+                       // check that the restarting time does not increase 
after we've reached the running state
+                       for (int i = 0; i < 10; i++) {
+                               long currentRestartingTime = 
restartingTime.getValue();
+       
+                               assertTrue(currentRestartingTime == 
previousRestartingTime);
+                               previousRestartingTime = currentRestartingTime;
+                       }
+       
+                       // fail job again
+                       for (ExecutionAttemptID executionID : executionIDs) {
+                               executionGraph.updateState(new 
TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, new 
Exception()));
+                       }
+       
+                       assertEquals(JobStatus.RESTARTING, 
executionGraph.getState());
+       
+                       long secondRestartingTimestamp = 
executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
+       
+                       assertTrue(firstRestartingTimestamp != 
secondRestartingTimestamp);
+       
+                       Thread.sleep(50);
+       
+                       previousRestartingTime = restartingTime.getValue();
+       
+                       // check that the restarting time is increasing again
+                       for (int i = 0; i < 10; i++) {
+                               long currentRestartingTime = 
restartingTime.getValue();
+       
+                               assertTrue(currentRestartingTime >= 
previousRestartingTime);
+                               previousRestartingTime = currentRestartingTime;
+                       }
+       
+                       assertTrue(previousRestartingTime > 0);
+       
+                       // now lets fail the job while it is in restarting and 
see whether the restarting time then stops to increase
+                       executionGraph.fail(new Exception());
+       
+                       assertEquals(JobStatus.FAILED, 
executionGraph.getState());
+       
+                       previousRestartingTime = restartingTime.getValue();
+       
+                       for (int i = 0; i < 10; i++) {
+                               long currentRestartingTime = 
restartingTime.getValue();
+       
+                               assertTrue(currentRestartingTime == 
previousRestartingTime);
+                               previousRestartingTime = currentRestartingTime;
+                       }
+               } finally {
+                       executor.shutdownNow();
                }
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index cddb6cb..df47c3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -113,7 +113,7 @@ public class ExecutionGraphTestUtils {
                InetAddress address = InetAddress.getByName("127.0.0.1");
                TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
-               return new Instance(gateway, connection, resourceID, new 
InstanceID(), hardwareDescription, numberOfSlots);
+               return new Instance(gateway, connection, new InstanceID(), 
hardwareDescription, numberOfSlots);
        }
 
        @SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
index a71faf6..870ae05 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TerminalStateDeadlockTest.java
@@ -84,8 +84,7 @@ public class TerminalStateDeadlockTest {
                        TaskManagerLocation ci = new 
TaskManagerLocation(resourceId, address, 12345);
                                
                        HardwareDescription resources = new 
HardwareDescription(4, 4000000, 3000000, 2000000);
-                       Instance instance = new 
Instance(DummyActorGateway.INSTANCE, ci,
-                               resourceId, new InstanceID(), resources, 4);
+                       Instance instance = new 
Instance(DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
 
                        this.resource = instance.allocateSimpleSlot(new 
JobID());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
index f1ed960..f3747c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceManagerTest.java
@@ -87,12 +87,9 @@ public class InstanceManagerTest{
                        final JavaTestKit probe2 = new JavaTestKit(system);
                        final JavaTestKit probe3 = new JavaTestKit(system);
 
-                       cm.registerTaskManager(probe1.getRef(), resID1,
-                               ici1, hardwareDescription, 1, leaderSessionID);
-                       cm.registerTaskManager(probe2.getRef(), resID2,
-                               ici2, hardwareDescription, 2, leaderSessionID);
-                       cm.registerTaskManager(probe3.getRef(), resID3,
-                               ici3, hardwareDescription, 5, leaderSessionID);
+                       cm.registerTaskManager(probe1.getRef(), ici1, 
hardwareDescription, 1, leaderSessionID);
+                       cm.registerTaskManager(probe2.getRef(), ici2, 
hardwareDescription, 2, leaderSessionID);
+                       cm.registerTaskManager(probe3.getRef(), ici3, 
hardwareDescription, 5, leaderSessionID);
 
                        assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
                        assertEquals(8, cm.getTotalNumberOfSlots());
@@ -102,7 +99,7 @@ public class InstanceManagerTest{
                                        HashSet<TaskManagerLocation>();
 
                        for(Instance instance: instances){
-                               
taskManagerLocations.add(instance.getInstanceConnectionInfo());
+                               
taskManagerLocations.add(instance.getTaskManagerLocation());
                        }
 
                        assertTrue(taskManagerLocations.contains(ici1));
@@ -133,14 +130,13 @@ public class InstanceManagerTest{
                        TaskManagerLocation ici = new 
TaskManagerLocation(resID1, address, dataPort);
 
                        JavaTestKit probe = new JavaTestKit(system);
-                       cm.registerTaskManager(probe.getRef(), resID1,
-                               ici, resources, 1, leaderSessionID);
+                       cm.registerTaskManager(probe.getRef(), ici, resources, 
1, leaderSessionID);
 
                        assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
                        assertEquals(1, cm.getTotalNumberOfSlots());
 
                        try {
-                               cm.registerTaskManager(probe.getRef(), resID2, 
ici, resources, 1, leaderSessionID);
+                               cm.registerTaskManager(probe.getRef(), ici, 
resources, 1, leaderSessionID);
                        } catch (Exception e) {
                                // good
                        }
@@ -182,12 +178,12 @@ public class InstanceManagerTest{
                        JavaTestKit probe2 = new JavaTestKit(system);
                        JavaTestKit probe3 = new JavaTestKit(system);
 
-                       InstanceID instanceID1 = 
cm.registerTaskManager(probe1.getRef(), resID1,
-                               ici1, hardwareDescription, 1, leaderSessionID);
-                       InstanceID instanceID2 = 
cm.registerTaskManager(probe2.getRef(), resID2,
-                               ici2, hardwareDescription, 1, leaderSessionID);
-                       InstanceID instanceID3 = 
cm.registerTaskManager(probe3.getRef(), resID3,
-                               ici3, hardwareDescription, 1, leaderSessionID);
+                       InstanceID instanceID1 = cm.registerTaskManager(
+                                       probe1.getRef(), ici1, 
hardwareDescription, 1, leaderSessionID);
+                       InstanceID instanceID2 = cm.registerTaskManager(
+                                       probe2.getRef(), ici2, 
hardwareDescription, 1, leaderSessionID);
+                       InstanceID instanceID3 = cm.registerTaskManager(
+                                       probe3.getRef(), ici3, 
hardwareDescription, 1, leaderSessionID);
 
                        // report some immediate heart beats
                        assertTrue(cm.reportHeartBeat(instanceID1, new byte[] 
{}));
@@ -241,8 +237,7 @@ public class InstanceManagerTest{
                                TaskManagerLocation ici = new 
TaskManagerLocation(resID, address, 20000);
 
                                JavaTestKit probe = new JavaTestKit(system);
-                               cm.registerTaskManager(probe.getRef(), resID,
-                                       ici, resources, 1, leaderSessionID);
+                               cm.registerTaskManager(probe.getRef(), ici, 
resources, 1, leaderSessionID);
                                fail("Should raise exception in shutdown 
state");
                        }
                        catch (IllegalStateException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
index 82d3723..aee62fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/InstanceTest.java
@@ -42,7 +42,7 @@ public class InstanceTest {
                        TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                        Instance instance = new 
Instance(DummyActorGateway.INSTANCE, connection,
-                                       resourceID, new InstanceID(), 
hardwareDescription, 4);
+                                       new InstanceID(), hardwareDescription, 
4);
 
                        assertEquals(4, instance.getTotalNumberOfSlots());
                        assertEquals(4, instance.getNumberOfAvailableSlots());
@@ -105,7 +105,7 @@ public class InstanceTest {
                        TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                        Instance instance = new 
Instance(DummyActorGateway.INSTANCE, connection,
-                                       resourceID, new InstanceID(), 
hardwareDescription, 3);
+                                       new InstanceID(), hardwareDescription, 
3);
 
                        assertEquals(3, instance.getNumberOfAvailableSlots());
 
@@ -137,7 +137,7 @@ public class InstanceTest {
                        TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                        Instance instance = new 
Instance(DummyActorGateway.INSTANCE, connection,
-                                       resourceID, new InstanceID(), 
hardwareDescription, 3);
+                                       new InstanceID(), hardwareDescription, 
3);
 
                        assertEquals(3, instance.getNumberOfAvailableSlots());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
index 2c40e89..0edef5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SharedSlotsTest.java
@@ -132,7 +132,7 @@ public class SharedSlotsTest {
                        assertEquals(Locality.LOCAL, sub1.getLocality());
                        assertEquals(1, sub1.getNumberLeaves());
                        assertEquals(vid1, sub1.getGroupID());
-                       assertEquals(instance.getResourceId(), 
sub1.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
sub1.getTaskManagerID());
                        assertEquals(jobId, sub1.getJobID());
                        assertEquals(sharedSlot, sub1.getParent());
                        assertEquals(sharedSlot, sub1.getRoot());
@@ -151,7 +151,7 @@ public class SharedSlotsTest {
                        assertEquals(Locality.UNCONSTRAINED, 
sub2.getLocality());
                        assertEquals(1, sub2.getNumberLeaves());
                        assertEquals(vid2, sub2.getGroupID());
-                       assertEquals(instance.getResourceId(), 
sub2.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
sub2.getTaskManagerID());
                        assertEquals(jobId, sub2.getJobID());
                        assertEquals(sharedSlot, sub2.getParent());
                        assertEquals(sharedSlot, sub2.getRoot());
@@ -163,14 +163,14 @@ public class SharedSlotsTest {
                        assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid3));
                        assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
                        
-                       SimpleSlot sub3 = assignment.getSlotForTask(vid3, 
Collections.singleton(instance.getInstanceConnectionInfo()));
+                       SimpleSlot sub3 = assignment.getSlotForTask(vid3, 
Collections.singleton(instance.getTaskManagerLocation()));
                        assertNotNull(sub3);
                        
                        assertNull(sub3.getExecutedVertex());
                        assertEquals(Locality.LOCAL, sub3.getLocality());
                        assertEquals(1, sub3.getNumberLeaves());
                        assertEquals(vid3, sub3.getGroupID());
-                       assertEquals(instance.getResourceId(), 
sub3.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
sub3.getTaskManagerID());
                        assertEquals(jobId, sub3.getJobID());
                        assertEquals(sharedSlot, sub3.getParent());
                        assertEquals(sharedSlot, sub3.getRoot());
@@ -183,14 +183,14 @@ public class SharedSlotsTest {
                        assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(vid4));
 
                        SimpleSlot sub4 = assignment.getSlotForTask(vid4,
-                                       
Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getInstanceConnectionInfo()));
+                                       
Collections.singleton(SchedulerTestUtils.getRandomInstance(1).getTaskManagerLocation()));
                        assertNotNull(sub4);
                        
                        assertNull(sub4.getExecutedVertex());
                        assertEquals(Locality.NON_LOCAL, sub4.getLocality());
                        assertEquals(1, sub4.getNumberLeaves());
                        assertEquals(vid4, sub4.getGroupID());
-                       assertEquals(instance.getResourceId(), 
sub4.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
sub4.getTaskManagerID());
                        assertEquals(jobId, sub4.getJobID());
                        assertEquals(sharedSlot, sub4.getParent());
                        assertEquals(sharedSlot, sub4.getRoot());
@@ -456,7 +456,7 @@ public class SharedSlotsTest {
                        assertNotNull(constraint.getSharedSlot());
                        assertTrue(constraint.isAssigned());
                        assertTrue(constraint.isAssignedAndAlive());
-                       assertEquals(instance.getInstanceConnectionInfo(), 
constraint.getLocation());
+                       assertEquals(instance.getTaskManagerLocation(), 
constraint.getLocation());
                        
                        SimpleSlot tailSlot = 
assignment.getSlotForTask(constraint, 
Collections.<TaskManagerLocation>emptySet());
                        
@@ -475,7 +475,7 @@ public class SharedSlotsTest {
                        assertTrue(tailSlot.isReleased());
                        assertTrue(constraint.isAssigned());
                        assertFalse(constraint.isAssignedAndAlive());
-                       assertEquals(instance.getInstanceConnectionInfo(), 
constraint.getLocation());
+                       assertEquals(instance.getTaskManagerLocation(), 
constraint.getLocation());
                        
                        // we should have resources again for the co-location 
constraint
                        assertEquals(1, 
assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
@@ -488,10 +488,10 @@ public class SharedSlotsTest {
                        assertEquals(0, 
assignment.getNumberOfAvailableSlotsForGroup(constraint.getGroupId()));
                        
                        // verify some basic properties of the slots
-                       assertEquals(instance.getResourceId(), 
sourceSlot.getTaskManagerID());
-                       assertEquals(instance.getResourceId(), 
headSlot.getTaskManagerID());
-                       assertEquals(instance.getResourceId(), 
tailSlot.getTaskManagerID());
-                       assertEquals(instance.getResourceId(), 
sinkSlot.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
sourceSlot.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
headSlot.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
tailSlot.getTaskManagerID());
+                       assertEquals(instance.getTaskManagerID(), 
sinkSlot.getTaskManagerID());
 
                        assertEquals(sourceId, sourceSlot.getGroupID());
                        assertEquals(sinkId, sinkSlot.getGroupID());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
index 82c2a74..c690d36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SimpleSlotTest.java
@@ -150,7 +150,7 @@ public class SimpleSlotTest {
                TaskManagerLocation connection = new 
TaskManagerLocation(resourceID, address, 10001);
 
                Instance instance = new Instance(DummyActorGateway.INSTANCE, 
connection,
-                               resourceID, new InstanceID(), 
hardwareDescription, 1);
+                               new InstanceID(), hardwareDescription, 1);
                return instance.allocateSimpleSlot(new JobID());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index fff7bc6..5722cac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
@@ -36,6 +35,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -121,7 +121,7 @@ public class SpilledSubpartitionViewTest {
                                }
                        }
 
-                       final List<Future<Boolean>> results = 
Lists.newArrayList();
+                       final List<Future<Boolean>> results = new ArrayList<>();
 
                        // Submit the consuming tasks
                        for (ResultSubpartitionView view : readers) {

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
index 3bd4368..1344aef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraintTest.java
@@ -140,7 +140,7 @@ public class CoLocationConstraintTest {
                        // now, the location is assigned and we have a location
                        assertTrue(constraint.isAssigned());
                        assertTrue(constraint.isAssignedAndAlive());
-                       assertEquals(instance2, constraint.getLocation());
+                       assertEquals(instance2.getTaskManagerLocation(), 
constraint.getLocation());
                        
                        // release the slot
                        slot2_1.releaseSlot();
@@ -148,7 +148,7 @@ public class CoLocationConstraintTest {
                        // we should still have a location
                        assertTrue(constraint.isAssigned());
                        assertFalse(constraint.isAssignedAndAlive());
-                       assertEquals(instance2, constraint.getLocation());
+                       assertEquals(instance2.getTaskManagerLocation(), 
constraint.getLocation());
 
                        // we can not assign a different location
                        try {
@@ -167,7 +167,7 @@ public class CoLocationConstraintTest {
 
                        assertTrue(constraint.isAssigned());
                        assertTrue(constraint.isAssignedAndAlive());
-                       assertEquals(instance2, constraint.getLocation());
+                       assertEquals(instance2.getTaskManagerLocation(), 
constraint.getLocation());
                }
                catch (Exception e) {
                        e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index 5b7d18a..eab4fea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -326,8 +326,8 @@ public class ScheduleWithCoLocationHintTest {
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
-                       TaskManagerLocation loc2 = 
i2.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+                       TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
@@ -398,8 +398,8 @@ public class ScheduleWithCoLocationHintTest {
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
-                       TaskManagerLocation loc2 = 
i2.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+                       TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
@@ -425,8 +425,8 @@ public class ScheduleWithCoLocationHintTest {
                        SimpleSlot s4 = scheduler.scheduleImmediately(new 
ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2));
                        
                        // still preserves the previous instance mapping)
-                       assertEquals(i1.getResourceId(), s3.getTaskManagerID());
-                       assertEquals(i2.getResourceId(), s4.getTaskManagerID());
+                       assertEquals(i1.getTaskManagerID(), 
s3.getTaskManagerID());
+                       assertEquals(i2.getTaskManagerID(), 
s4.getTaskManagerID());
                        
                        s3.releaseSlot();
                        s4.releaseSlot();
@@ -455,8 +455,8 @@ public class ScheduleWithCoLocationHintTest {
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
-                       TaskManagerLocation loc2 = 
i2.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+                       TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
@@ -516,7 +516,7 @@ public class ScheduleWithCoLocationHintTest {
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);
@@ -580,8 +580,8 @@ public class ScheduleWithCoLocationHintTest {
                        Instance i1 = getRandomInstance(1);
                        Instance i2 = getRandomInstance(1);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
-                       TaskManagerLocation loc2 = 
i2.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+                       TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
                        scheduler.newInstanceAvailable(i2);
                        scheduler.newInstanceAvailable(i1);

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index a683834..fd0523b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -108,10 +108,10 @@ public class SchedulerSlotSharingTest {
                        
                        // make sure we have two slots on the first instance, 
and two on the second
                        int c = 0;
-                       c += (s5.getTaskManagerID().equals(i1.getResourceId())) 
? 1 : -1;
-                       c += (s6.getTaskManagerID().equals(i1.getResourceId())) 
? 1 : -1;
-                       c += (s7.getTaskManagerID().equals(i1.getResourceId())) 
? 1 : -1;
-                       c += (s8.getTaskManagerID().equals(i1.getResourceId())) 
? 1 : -1;
+                       c += 
(s5.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+                       c += 
(s6.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+                       c += 
(s7.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
+                       c += 
(s8.getTaskManagerID().equals(i1.getTaskManagerID())) ? 1 : -1;
                        assertEquals(0, c);
                        
                        // release all
@@ -637,8 +637,8 @@ public class SchedulerSlotSharingTest {
                        Instance i1 = getRandomInstance(2);
                        Instance i2 = getRandomInstance(2);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
-                       TaskManagerLocation loc2 = 
i2.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+                       TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        scheduler.newInstanceAvailable(i1);
@@ -690,8 +690,8 @@ public class SchedulerSlotSharingTest {
                        Instance i1 = getRandomInstance(2);
                        Instance i2 = getRandomInstance(2);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
-                       TaskManagerLocation loc2 = 
i2.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
+                       TaskManagerLocation loc2 = i2.getTaskManagerLocation();
 
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        scheduler.newInstanceAvailable(i1);
@@ -743,7 +743,7 @@ public class SchedulerSlotSharingTest {
                        Instance i1 = getRandomInstance(2);
                        Instance i2 = getRandomInstance(2);
 
-                       TaskManagerLocation loc1 = 
i1.getInstanceConnectionInfo();
+                       TaskManagerLocation loc1 = i1.getTaskManagerLocation();
 
                        Scheduler scheduler = new 
Scheduler(TestingUtils.directExecutionContext());
                        scheduler.newInstanceAvailable(i1);
@@ -771,12 +771,12 @@ public class SchedulerSlotSharingTest {
                        assertEquals(0, i1.getNumberOfAvailableSlots());
                        assertEquals(0, i2.getNumberOfAvailableSlots());
                        
-                       assertEquals(i1.getResourceId(), s1.getTaskManagerID());
-                       assertEquals(i1.getResourceId(), s2.getTaskManagerID());
-                       assertEquals(i1.getResourceId(), s3.getTaskManagerID());
-                       assertEquals(i1.getResourceId(), s4.getTaskManagerID());
-                       assertEquals(i2.getResourceId(), s5.getTaskManagerID());
-                       assertEquals(i2.getResourceId(), s6.getTaskManagerID());
+                       assertEquals(i1.getTaskManagerID(), 
s1.getTaskManagerID());
+                       assertEquals(i1.getTaskManagerID(), 
s2.getTaskManagerID());
+                       assertEquals(i1.getTaskManagerID(), 
s3.getTaskManagerID());
+                       assertEquals(i1.getTaskManagerID(), 
s4.getTaskManagerID());
+                       assertEquals(i2.getTaskManagerID(), 
s5.getTaskManagerID());
+                       assertEquals(i2.getTaskManagerID(), 
s6.getTaskManagerID());
                        
                        // check the scheduler's bookkeeping
                        assertEquals(4, 
scheduler.getNumberOfLocalizedAssignments());

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index eef27a8..d040ec4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -69,8 +69,7 @@ public class SchedulerTestUtils {
                final long GB = 1024L*1024*1024;
                HardwareDescription resources = new HardwareDescription(4, 
4*GB, 3*GB, 2*GB);
                
-               return new Instance(DummyActorGateway.INSTANCE, ci, resourceID,
-                       new InstanceID(), resources, numSlots);
+               return new Instance(DummyActorGateway.INSTANCE, ci, new 
InstanceID(), resources, numSlots);
        }
        
        
@@ -88,7 +87,7 @@ public class SchedulerTestUtils {
        public static Execution getTestVertex(Instance... preferredInstances) {
                List<TaskManagerLocation> locations = new 
ArrayList<>(preferredInstances.length);
                for (Instance i : preferredInstances) {
-                       locations.add(i.getInstanceConnectionInfo());
+                       locations.add(i.getTaskManagerLocation());
                }
                return getTestVertex(locations);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eac6088a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
index d9c100c..ea0d2cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFutureTest.java
@@ -56,9 +56,9 @@ public class SlotAllocationFutureTest {
                        final Instance instance2 = 
SchedulerTestUtils.getRandomInstance(1);
 
                        final SimpleSlot slot1 = new SimpleSlot(new JobID(), 
instance1,
-                                       instance1.getInstanceConnectionInfo(), 
0, instance1.getActorGateway(), null, null);
+                                       instance1.getTaskManagerLocation(), 0, 
instance1.getActorGateway(), null, null);
                        final SimpleSlot slot2 = new SimpleSlot(new JobID(), 
instance2,
-                                       instance2.getInstanceConnectionInfo(), 
0, instance2.getActorGateway(), null, null);
+                                       instance2.getTaskManagerLocation(), 0, 
instance2.getActorGateway(), null, null);
                        
                        future.setSlot(slot1);
                        try {
@@ -85,7 +85,7 @@ public class SlotAllocationFutureTest {
                                final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
 
                                final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance,
-                                               
instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, 
null);
+                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
                                
                                SlotAllocationFuture future = new 
SlotAllocationFuture();
                                
@@ -108,7 +108,7 @@ public class SlotAllocationFutureTest {
                                final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
                                
                                final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance,
-                                               
instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, 
null);
+                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
                                
                                SlotAllocationFuture future = new 
SlotAllocationFuture();
                                future.setSlot(thisSlot);
@@ -141,7 +141,7 @@ public class SlotAllocationFutureTest {
                                final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
 
                                final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance,
-                                               
instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, 
null);
+                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
                                
                                final SlotAllocationFuture future = new 
SlotAllocationFuture();
                                
@@ -181,7 +181,7 @@ public class SlotAllocationFutureTest {
                                final Instance instance = 
SchedulerTestUtils.getRandomInstance(1);
 
                                final SimpleSlot thisSlot = new SimpleSlot(new 
JobID(), instance, 
-                                               
instance.getInstanceConnectionInfo(), 0, instance.getActorGateway(), null, 
null);
+                                               
instance.getTaskManagerLocation(), 0, instance.getActorGateway(), null, null);
                                final SlotAllocationFuture future = new 
SlotAllocationFuture();
 
                                future.setSlot(thisSlot);

Reply via email to