http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index bae7086..85b7eb4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -26,12 +26,14 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
@@ -152,10 +154,15 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 
5L);
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-               TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
                        Time.seconds(5L),
                        Time.seconds(5L));
+                       
+               SlotManager slotManager = new SlotManager(
+                       rpcService.getScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime());
 
                MetricRegistry metricRegistry = mock(MetricRegistry.class);
                JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
@@ -171,7 +178,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                                resourceManagerConfiguration,
                                highAvailabilityServices,
                                heartbeatServices,
-                               slotManagerFactory,
+                               slotManager,
                                metricRegistry,
                                jobLeaderIdService,
                                fatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
deleted file mode 100644
index 67b208d..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.mockito.Mockito;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-public class TestingSlotManager extends SlotManager {
-
-       public TestingSlotManager() {
-               this(new TestingResourceManagerServices());
-       }
-
-       public TestingSlotManager(ResourceManagerServices rmServices) {
-               super(rmServices);
-       }
-
-       @Override
-       protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, 
ResourceSlot> freeSlots) {
-               final Iterator<ResourceSlot> slotIterator = 
freeSlots.values().iterator();
-               if (slotIterator.hasNext()) {
-                       return slotIterator.next();
-               } else {
-                       return null;
-               }
-       }
-
-       @Override
-       protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, 
Map<AllocationID, SlotRequest> pendingRequests) {
-               final Iterator<SlotRequest> requestIterator = 
pendingRequests.values().iterator();
-               if (requestIterator.hasNext()) {
-                       return requestIterator.next();
-               } else {
-                       return null;
-               }
-       }
-
-       private static class TestingResourceManagerServices implements 
ResourceManagerServices {
-
-               private final UUID leaderID = UUID.randomUUID();
-
-               @Override
-               public UUID getLeaderID() {
-                       return leaderID;
-               }
-
-               @Override
-               public void allocateResource(ResourceProfile resourceProfile) {
-
-               }
-
-               @Override
-               public Executor getAsyncExecutor() {
-                       return Mockito.mock(Executor.class);
-               }
-
-               @Override
-               public Executor getMainThreadExecutor() {
-                       return Mockito.mock(Executor.class);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
deleted file mode 100644
index 6b5f6b2..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-
-public class TestingSlotManagerFactory implements SlotManagerFactory {
-
-       @Override
-       public SlotManager create(ResourceManagerServices rmServices) {
-               return new TestingSlotManager(rmServices);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 041747d..b0b5d32 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -23,510 +23,1058 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.junit.BeforeClass;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.ArgumentCaptor;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotManagerTest extends TestLogger {
 
-public class SlotManagerTest {
-
-       private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+       /**
+        * Tests that we can register task manager and their slots at the slot 
manager.
+        */
+       @Test
+       public void testTaskManagerRegistration() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
-       private static final int DEFAULT_TESTING_MEMORY = 512;
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
 
-       private static final ResourceProfile DEFAULT_TESTING_PROFILE =
-               new ResourceProfile(DEFAULT_TESTING_CPU_CORES, 
DEFAULT_TESTING_MEMORY);
+               ResourceID resourceId = ResourceID.generate();
+               final SlotID slotId1 = new SlotID(resourceId, 0);
+               final SlotID slotId2 = new SlotID(resourceId, 1);
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
+               final SlotReport slotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-       private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
-               new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
 
-       private static TaskExecutorRegistration taskExecutorRegistration;
+                       assertTrue("The number registered slots does not equal 
the expected number.",2 == slotManager.getNumberRegisteredSlots());
 
-       @BeforeClass
-       public static void setUp() {
-               taskExecutorRegistration = 
Mockito.mock(TaskExecutorRegistration.class);
-               TaskExecutorGateway gateway = 
Mockito.mock(TaskExecutorGateway.class);
-               
Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
-               Mockito.when(gateway.requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), 
any(Time.class)))
-                       .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
+                       assertNotNull(slotManager.getSlot(slotId1));
+                       assertNotNull(slotManager.getSlot(slotId2));
+               }
        }
 
        /**
-        * Tests that there are no free slots when we request, need to allocate 
from cluster manager master
+        * Tests that un-registration of task managers will free and remove all 
registered slots.
         */
        @Test
-       public void testRequestSlotWithoutFreeSlot() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       public void testTaskManagerUnregistration() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final JobID jobId = new JobID();
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       any(JobID.class),
+                       any(AllocationID.class),
+                       anyString(),
+                       eq(leaderId),
+                       any(Time.class))).thenReturn(new 
FlinkCompletableFuture<Acknowledge>());
+
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               ResourceID resourceId = ResourceID.generate();
+               final SlotID slotId1 = new SlotID(resourceId, 0);
+               final SlotID slotId2 = new SlotID(resourceId, 1);
+               final AllocationID allocationId1 = new AllocationID();
+               final AllocationID allocationId2 = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile, jobId, allocationId1);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
+               final SlotReport slotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+               final SlotRequest slotRequest = new SlotRequest(
+                       new JobID(),
+                       allocationId2,
+                       resourceProfile,
+                       "foobar");
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+
+                       assertTrue("The number registered slots does not equal 
the expected number.",2 == slotManager.getNumberRegisteredSlots());
+
+                       TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
+                       TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
+
+                       assertTrue(slot1.isAllocated());
+                       assertTrue(slot2.isFree());
+
+                       
assertTrue(slotManager.registerSlotRequest(slotRequest));
+
+                       assertFalse(slot2.isFree());
+                       assertTrue(slot2.hasPendingSlotRequest());
+
+                       PendingSlotRequest pendingSlotRequest = 
slotManager.getSlotRequest(allocationId2);
+
+                       assertTrue("The pending slot request should have been 
assigned to slot 2", pendingSlotRequest.isAssigned());
+
+                       
slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID());
+
+                       assertTrue(0 == slotManager.getNumberRegisteredSlots());
+                       assertFalse(pendingSlotRequest.isAssigned());
+               }
        }
 
        /**
-        * Tests that there are some free slots when we request, and the 
request is fulfilled immediately
+        * Tests that a slot request with no free slots will trigger the 
resource allocation
         */
        @Test
-       public void testRequestSlotWithFreeSlot() {
-               TestingSlotManager slotManager = new TestingSlotManager();
+       public void testSlotRequestWithoutFreeSlots() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotRequest slotRequest = new SlotRequest(
+                       new JobID(),
+                       new AllocationID(),
+                       resourceProfile,
+                       "localhost");
 
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
-               assertEquals(1, slotManager.getFreeSlotCount());
+               ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertEquals(0, slotManager.getAllocatedContainers().size());
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       
verify(resourceManagerActions).allocateResource(eq(resourceProfile));
+               }
        }
 
        /**
-        * Tests that there are some free slots when we request, but none of 
them are suitable
+        * Tests that the slot request fails if we cannot allocate more 
resources.
         */
        @Test
-       public void testRequestSlotWithoutSuitableSlot() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
2);
-               assertEquals(2, slotManager.getFreeSlotCount());
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(2, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       public void testSlotRequestWithResourceAllocationFailure() throws 
Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotRequest slotRequest = new SlotRequest(
+                       new JobID(),
+                       new AllocationID(),
+                       resourceProfile,
+                       "localhost");
+
+               ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               doThrow(new ResourceManagerException("Test 
exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       fail("The slot request should have failed with a 
ResourceManagerException.");
+
+               } catch (ResourceManagerException e) {
+                       // expected exception
+               }
        }
 
        /**
-        * Tests that we send duplicated slot request
+        * Tests that a slot request which can be fulfilled will trigger a slot 
allocation.
         */
        @Test
-       public void testDuplicatedSlotRequest() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
-
-               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
-               slotManager.requestSlot(request1);
-               slotManager.requestSlot(request2);
-               slotManager.requestSlot(request2);
-               slotManager.requestSlot(request1);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       public void testSlotRequestWithFreeSlot() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceID resourceID = ResourceID.generate();
+               final JobID jobId = new JobID();
+               final SlotID slotId = new SlotID(resourceID, 0);
+               final String targetAddress = "localhost";
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotRequest slotRequest = new SlotRequest(
+                       jobId,
+                       allocationId,
+                       resourceProfile,
+                       targetAddress);
+
+               ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+
+                       // accept an incoming slot request
+                       final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+                       when(taskExecutorGateway.requestSlot(
+                               eq(slotId),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               
any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+                       final TaskExecutorConnection taskExecutorConnection = 
new TaskExecutorConnection(taskExecutorGateway);
+
+                       final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+                       final SlotReport slotReport = new 
SlotReport(slotStatus);
+
+                       slotManager.registerTaskManager(
+                               taskExecutorConnection,
+                               slotReport);
+
+                       assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(slotRequest));
+
+                       verify(taskExecutorGateway).requestSlot(eq(slotId), 
eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class));
+
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
+               }
        }
 
        /**
-        * Tests that we send multiple slot requests
+        * Checks that un-registering a pending slot request will cancel it, 
removing it from all
+        * assigned task manager slots and then remove it from the slot manager.
         */
        @Test
-       public void testRequestMultipleSlots() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
5);
+       public void testUnregisterPendingSlotRequest() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+               final AllocationID allocationId = new AllocationID();
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       any(JobID.class),
+                       any(AllocationID.class),
+                       anyString(),
+                       eq(leaderId),
+                       any(Time.class))).thenReturn(new 
FlinkCompletableFuture<Acknowledge>());
+
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+               final SlotReport slotReport = new SlotReport(slotStatus);
 
-               // request 3 normal slots
-               for (int i = 0; i < 3; ++i) {
-                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_PROFILE));
-               }
+               final SlotRequest slotRequest = new SlotRequest(new JobID(), 
allocationId, resourceProfile, "foobar");
 
-               // request 2 big slots
-               for (int i = 0; i < 2; ++i) {
-                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               }
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
 
-               // request 1 normal slot again
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
 
-               assertEquals(4, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(2, slotManager.getPendingRequestCount());
-               assertEquals(2, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(1));
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       assertNotNull(slotManager.getSlotRequest(allocationId));
+
+                       assertTrue(slot.hasPendingSlotRequest());
+
+                       slotManager.unregisterSlotRequest(allocationId);
+
+                       assertNull(slotManager.getSlotRequest(allocationId));
+
+                       slot = slotManager.getSlot(slotId);
+                       assertTrue(slot.isFree());
+               }
        }
 
        /**
-        * Tests that a new slot appeared in SlotReport, and we used it to 
fulfill a pending request
+        * Tests that pending slot requests are tried to be fulfilled upon new 
slot registrations.
         */
        @Test
-       public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               assertEquals(1, slotManager.getPendingRequestCount());
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
+       public void testFulfillingPendingSlotRequest() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceID resourceID = ResourceID.generate();
+               final JobID jobId = new JobID();
+               final SlotID slotId = new SlotID(resourceID, 0);
+               final String targetAddress = "localhost";
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotRequest slotRequest = new SlotRequest(
+                       jobId,
+                       allocationId,
+                       resourceProfile,
+                       targetAddress);
+
+               ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+
+               // accept an incoming slot request
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       eq(slotId),
+                       eq(jobId),
+                       eq(allocationId),
+                       anyString(),
+                       eq(leaderId),
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+               final TaskExecutorConnection taskExecutorConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+               final SlotReport slotReport = new SlotReport(slotStatus);
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+
+                       assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(slotRequest));
+
+                       verify(resourceManagerActions, 
times(1)).allocateResource(eq(resourceProfile));
+
+                       slotManager.registerTaskManager(
+                               taskExecutorConnection,
+                               slotReport);
+
+                       verify(taskExecutorGateway).requestSlot(eq(slotId), 
eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class));
+
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
+               }
        }
 
        /**
-        * Tests that a new slot appeared in SlotReport, but we have no pending 
request
+        * Tests that freeing a slot will correctly reset the slot and mark it 
as a free slot
         */
        @Test
-       public void testNewlyAppearedFreeSlot() {
-               TestingSlotManager slotManager = new TestingSlotManager();
+       public void testFreeSlot() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceID resourceID = ResourceID.generate();
+               final JobID jobId = new JobID();
+               final SlotID slotId = new SlotID(resourceID, 0);
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+
+               ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+
+               // accept an incoming slot request
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
 
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
+               final TaskExecutorConnection taskExecutorConnection = new 
TaskExecutorConnection(taskExecutorGateway);
 
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile, jobId, allocationId);
+               final SlotReport slotReport = new SlotReport(slotStatus);
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+
+                       slotManager.registerTaskManager(
+                               taskExecutorConnection,
+                               slotReport);
+
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
+
+                       // this should be ignored since the allocation id does 
not match
+                       slotManager.freeSlot(slotId, new AllocationID());
+
+                       assertTrue(slot.isAllocated());
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
+
+                       slotManager.freeSlot(slotId, allocationId);
+
+                       assertTrue(slot.isFree());
+                       assertNull(slot.getAllocationId());
+               }
        }
 
        /**
-        * Tests that a new slot appeared in SlotReport, but it't not suitable 
for all the pending requests
+        * Tests that a second pending slot request is detected as a duplicate 
if the allocation ids are
+        * the same.
         */
        @Test
-       public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               assertEquals(1, slotManager.getPendingRequestCount());
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(slotId));
+       public void testDuplicatePendingSlotRequest() throws Exception {
+
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
+               final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
+               final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
+               final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       
assertTrue(slotManager.registerSlotRequest(slotRequest1));
+                       
assertFalse(slotManager.registerSlotRequest(slotRequest2));
+               }
+
+               // check that we have only called the resource allocation only 
for the first slot request,
+               // since the second request is a duplicate
+               verify(resourceManagerActions, 
times(1)).allocateResource(any(ResourceProfile.class));
        }
 
        /**
-        * Tests that a new slot appeared in SlotReport, and it's been reported 
using by some job
+        * Tests that if we have received a slot report with some allocated 
slots, then we don't accept
+        * slot requests with allocated allocation ids.
         */
        @Test
-       public void testNewlyAppearedInUseSlot() {
-               TestingSlotManager slotManager = new TestingSlotManager();
+       public void testDuplicatePendingSlotRequestAfterSlotReport() throws 
Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile, jobId, allocationId);
+               final SlotReport slotReport = new SlotReport(slotStatus);
 
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+
+                       
assertFalse(slotManager.registerSlotRequest(slotRequest));
+               }
        }
 
        /**
-        * Tests that we had a slot in-use and is freed again subsequently.
+        * Tests that duplicate slot requests (requests with an already 
registered allocation id) are
+        * also detected after a pending slot request has been fulfilled but 
not yet freed.
         */
        @Test
-       public void testExistingInUseSlotUpdateStatus() {
-               TestingSlotManager slotManager = new TestingSlotManager();
+       public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() 
throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
+               final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
+               final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
+               final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       any(JobID.class),
+                       any(AllocationID.class),
+                       anyString(),
+                       eq(leaderId),
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile1);
+               final SlotReport slotReport = new SlotReport(slotStatus);
 
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+                       
assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
 
-               // slot is freed again
-               slotManager.notifySlotAvailable(slotId.getResourceID(), slotId);
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
 
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertFalse(slotManager.isAllocated(slotId));
+                       
assertFalse(slotManager.registerSlotRequest(slotRequest2));
+               }
+
+               // check that we have only called the resource allocation only 
for the first slot request,
+               // since the second request is a duplicate
+               verify(resourceManagerActions, 
never()).allocateResource(any(ResourceProfile.class));
        }
 
        /**
-        * Tests multiple slot requests with one slots.
+        * Tests that an already registered allocation id can be reused after 
the initial slot request
+        * has been freed.
         */
        @Test
-       public void testMultipleSlotRequestsWithOneSlot() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               final AllocationID allocationID = new AllocationID();
+       public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() 
throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
+               final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
+               final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
+               final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       any(JobID.class),
+                       any(AllocationID.class),
+                       anyString(),
+                       eq(leaderId),
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+               final SlotStatus slotStatus = new SlotStatus(slotId, new 
ResourceProfile(2.0, 2));
+               final SlotReport slotReport = new SlotReport(slotStatus);
 
-               SlotRequest request1 = new SlotRequest(new JobID(), 
allocationID, DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request1);
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+                       
assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
-               final ResourceID resourceID = ResourceID.generate();
-               final SlotStatus slotStatus = new SlotStatus(new 
SlotID(resourceID, 0), DEFAULT_TESTING_PROFILE);
-               final SlotReport slotReport = new SlotReport(slotStatus);
-               slotManager.registerTaskExecutor(resourceID, 
taskExecutorRegistration, slotReport);
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
 
-               // another request pending
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request2);
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(allocationID));
-               assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+                       slotManager.freeSlot(slotId, allocationId);
 
-               // but slot is reported empty in a report in the meantime which 
shouldn't affect the state
-               slotManager.notifySlotAvailable(resourceID, 
slotStatus.getSlotID());
+                       // check that the slot has been freed
+                       assertTrue(slot.isFree());
+                       assertNull(slot.getAllocationId());
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotStatus.getSlotID()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+                       
assertTrue(slotManager.registerSlotRequest(slotRequest2));
 
-               // but slot is reported empty in a report in the meantime which 
shouldn't affect the state
-               slotManager.notifySlotAvailable(resourceID, 
slotStatus.getSlotID());
+                       assertEquals("The slot has not been allocated to the 
expected allocation id.", allocationId, slot.getAllocationId());
+               }
 
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
+               // check that we have only called the resource allocation only 
for the first slot request,
+               // since the second request is a duplicate
+               verify(resourceManagerActions, 
never()).allocateResource(any(ResourceProfile.class));
        }
 
        /**
-        * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
+        * Tests that the slot manager ignores slot reports of unknown origin 
(not registered
+        * task managers).
         */
        @Test
-       public void testSlotAllocationFailedAtTaskManager() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-               slotManager.addFreeSlot(slot);
+       public void testReceivingUnknownSlotReport() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
+               final InstanceID unknownInstanceID = new InstanceID();
+               final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 
0);
+               final ResourceProfile unknownResourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotStatus unknownSlotStatus = new 
SlotStatus(unknownSlotId, unknownResourceProfile);
+               final SlotReport unknownSlotReport = new 
SlotReport(unknownSlotStatus);
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       // check that we don't have any slots registered
+                       assertTrue(0 == slotManager.getNumberRegisteredSlots());
 
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+                       // this should not update anything since the instance 
id is not known to the slot manager
+                       
assertFalse(slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
+                       assertTrue(0 == slotManager.getNumberRegisteredSlots());
+               }
        }
 
-
        /**
-        * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
-        * This can only occur after reconnect of the TaskExecutor.
+        * Tests that slots are updated with respect to the latest incoming 
slot report. This means that
+        * slot for which not report has been received will be removed and 
those for which a report was
+        * received are updated accordingly.
         */
        @Test
-       public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               final SlotID slotID = SlotID.generate();
-               SlotStatus slot = new SlotStatus(slotID, 
DEFAULT_TESTING_PROFILE);
-               SlotReport slotReport = new SlotReport(slot);
-               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorRegistration, slotReport);
-
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               // slot is set empty by a reconnect of the TaskExecutor
-               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorRegistration, slotReport);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               // another request takes the slot
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
-               // original request should be retried
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slotID);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       public void testUpdateSlotReport() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+
+               final ResourceID resourceId = ResourceID.generate();
+               final SlotID slotId1 = new SlotID(resourceId, 0);
+               final SlotID slotId2 = new SlotID(resourceId, 1);
+               final SlotID slotId3 = new SlotID(resourceId, 2);
+
+
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
+
+               final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, 
resourceProfile, jobId, allocationId);
+               final SlotStatus slotStatus3 = new SlotStatus(slotId3, 
resourceProfile);
+
+               final SlotReport slotReport1 = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+               final SlotReport slotReport2 = new 
SlotReport(Arrays.asList(newSlotStatus2, slotStatus3));
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+                       // check that we don't have any slots registered
+                       assertTrue(0 == slotManager.getNumberRegisteredSlots());
+
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport1);
+
+                       TaskManagerSlot slot = slotManager.getSlot(slotId2);
+
+                       assertTrue(2 == slotManager.getNumberRegisteredSlots());
+
+                       assertTrue(slot.isFree());
+
+                       
assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), 
slotReport2));
+
+                       assertTrue(2 == slotManager.getNumberRegisteredSlots());
+
+                       // the slot manager should have removed slotId1
+                       assertNull(slotManager.getSlot(slotId1));
+
+                       assertNotNull(slotManager.getSlot(slotId3));
+
+                       // slotId2 should have been allocated for allocationId
+                       assertEquals(allocationId, 
slotManager.getSlot(slotId2).getAllocationId());
+               }
        }
 
+       /**
+        * Tests that idle task managers time out after the configured timeout. 
A timed out task manager
+        * will be removed from the slot manager and the resource manager will 
be notified about the
+        * timeout.
+        */
        @Test
-       public void testNotifyTaskManagerFailure() {
-               TestingSlotManager slotManager = new TestingSlotManager();
+       public void testTaskManagerTimeout() throws Exception {
+               final long tmTimeout = 50L;
 
-               ResourceID resource1 = ResourceID.generate();
-               ResourceID resource2 = ResourceID.generate();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final UUID leaderId = UUID.randomUUID();
 
-               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
 
-               slotManager.addFreeSlot(slot11);
-               slotManager.addFreeSlot(slot21);
+               final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+               final SlotReport slotReport = new SlotReport(slotStatus);
 
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               final Executor mainThreadExecutor = mock(Executor.class);
 
-               assertEquals(2, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
+               try (SlotManager slotManager = new SlotManager(
+                       TestingUtils.defaultScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       Time.milliseconds(tmTimeout))) {
 
-               slotManager.addFreeSlot(slot12);
-               slotManager.addFreeSlot(slot22);
+                       slotManager.start(leaderId, mainThreadExecutor, 
resourceManagerActions);
 
-               assertEquals(2, slotManager.getAllocatedSlotCount());
-               assertEquals(2, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
 
-               slotManager.notifyTaskManagerFailure(resource2);
+                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
+                       verify(mainThreadExecutor, timeout(tmTimeout * 
10L)).execute(runnableArgumentCaptor.capture());
 
-               // notify an not exist resource failure
-               slotManager.notifyTaskManagerFailure(ResourceID.generate());
+                       // the only runnable being executed by the main thread 
executor should be the timeout runnable
+                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
+                       timeoutRunnable.run();
 
-       // 
------------------------------------------------------------------------
-       //  testing utilities
-       // 
------------------------------------------------------------------------
-
-       private void directlyProvideFreeSlots(
-               final SlotManager slotManager,
-               final ResourceProfile resourceProfile,
-               final int freeSlotNum)
-       {
-               for (int i = 0; i < freeSlotNum; ++i) {
-                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), 
taskExecutorRegistration));
+                       verify(resourceManagerActions, 
times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
                }
        }
 
-       // 
------------------------------------------------------------------------
-       //  testing classes
-       // 
------------------------------------------------------------------------
+       /**
+        * Tests that slot requests time out after the specified request 
timeout. If a slot request
+        * times out, then the request is cancelled, removed from the slot 
manager and the resourc
+        * manager is notified about the failed allocation.
+        */
+       @Test
+       public void testSlotRequestTimeout() throws Exception {
+               final long allocationTimeout = 50L;
+
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final UUID leaderId = UUID.randomUUID();
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
 
-       private static class TestingSlotManager extends SlotManager {
+               final Executor mainThreadExecutor = mock(Executor.class);
 
-               private static TestingRmServices testingRmServices = new 
TestingRmServices();
+               try (SlotManager slotManager = new SlotManager(
+                       TestingUtils.defaultScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       Time.milliseconds(allocationTimeout),
+                       TestingUtils.infiniteTime())) {
 
-               TestingSlotManager() {
-                       super(testingRmServices);
-                       testingRmServices.allocatedContainers.clear();
+                       slotManager.start(leaderId, mainThreadExecutor, 
resourceManagerActions);
+
+                       
assertTrue(slotManager.registerSlotRequest(slotRequest));
+
+                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+
+                       verify(mainThreadExecutor, timeout(allocationTimeout * 
10L)).execute(runnableArgumentCaptor.capture());
+
+                       // the only runnable being executed by the main thread 
executor should be the timeout runnable
+                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
+
+                       timeoutRunnable.run();
+
+                       verify(resourceManagerActions, 
times(1)).notifyAllocationFailure(
+                               eq(jobId),
+                               eq(allocationId),
+                               any(TimeoutException.class));
                }
+       }
 
-               /**
-                * Choose slot randomly if it matches requirement
-                *
-                * @param request   The slot request
-                * @param freeSlots All slots which can be used
-                * @return The chosen slot or null if cannot find a match
-                */
-               @Override
-               protected ResourceSlot chooseSlotToUse(SlotRequest request, 
Map<SlotID, ResourceSlot> freeSlots) {
-                       for (ResourceSlot slot : freeSlots.values()) {
-                               if 
(slot.isMatchingRequirement(request.getResourceProfile())) {
-                                       return slot;
-                               }
+       /**
+        * Tests that a slot request is retried if it times out on the task 
manager side
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testTaskManagerSlotRequestTimeoutHandling() throws 
Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
+               final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = 
new FlinkCompletableFuture<>();
+               final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = 
new FlinkCompletableFuture<>();
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       any(JobID.class),
+                       eq(allocationId),
+                       anyString(),
+                       any(UUID.class),
+                       any(Time.class))).thenReturn(slotRequestFuture1, 
slotRequestFuture2);
+
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final ResourceID resourceId = ResourceID.generate();
+               final SlotID slotId1 = new SlotID(resourceId, 0);
+               final SlotID slotId2 = new SlotID(resourceId, 1);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
+               final SlotReport slotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+               try (SlotManager slotManager = createSlotManager(leaderId, 
resourceManagerActions)) {
+
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       ArgumentCaptor<SlotID> slotIdCaptor = 
ArgumentCaptor.forClass(SlotID.class);
+
+                       verify(taskExecutorGateway, times(1)).requestSlot(
+                               slotIdCaptor.capture(),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               any(Time.class));
+
+                       TaskManagerSlot failedSlot = 
slotManager.getSlot(slotIdCaptor.getValue());
+
+                       // let the first attempt fail --> this should trigger a 
second attempt
+                       slotRequestFuture1.completeExceptionally(new 
SlotAllocationException("Test exception."));
+
+                       verify(taskExecutorGateway, times(2)).requestSlot(
+                               slotIdCaptor.capture(),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               any(Time.class));
+
+                       // the second attempt succeeds
+                       slotRequestFuture2.complete(Acknowledge.get());
+
+                       TaskManagerSlot slot = 
slotManager.getSlot(slotIdCaptor.getValue());
+
+                       assertTrue(slot.isAllocated());
+                       assertEquals(allocationId, slot.getAllocationId());
+
+                       if (!failedSlot.getSlotId().equals(slot.getSlotId())) {
+                               assertTrue(failedSlot.isFree());
                        }
-                       return null;
                }
+       }
 
-               /**
-                * Choose request randomly if offered slot can match its 
requirement
-                *
-                * @param offeredSlot     The free slot
-                * @param pendingRequests All the pending slot requests
-                * @return The chosen request's AllocationID or null if cannot 
find a match
-                */
-               @Override
-               protected SlotRequest chooseRequestToFulfill(ResourceSlot 
offeredSlot,
-                       Map<AllocationID, SlotRequest> pendingRequests)
-               {
-                       for (Map.Entry<AllocationID, SlotRequest> 
pendingRequest : pendingRequests.entrySet()) {
-                               if 
(offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile()))
 {
-                                       return pendingRequest.getValue();
-                               }
-                       }
-                       return null;
+       /**
+        * Tests that pending slot requests are rejected if a slot report with 
a different allocation
+        * is received.
+        */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testSlotReportWhileActiveSlotRequest() throws Exception {
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(42.0, 1337);
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
+               final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = 
new FlinkCompletableFuture<>();
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       any(JobID.class),
+                       eq(allocationId),
+                       anyString(),
+                       any(UUID.class),
+                       any(Time.class))).thenReturn(slotRequestFuture1, 
FlinkCompletableFuture.completed(Acknowledge.get()));
+
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final ResourceID resourceId = ResourceID.generate();
+               final SlotID slotId1 = new SlotID(resourceId, 0);
+               final SlotID slotId2 = new SlotID(resourceId, 1);
+               final SlotID slotId3 = new SlotID(resourceId, 2);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
+               final SlotReport slotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+               // we have to manually trigger the future call backs to 
simulate the main thread executor behaviour
+               final Executor mainThreadExecutorMock = mock(Executor.class);
+
+               try (SlotManager slotManager = new SlotManager(
+                       TestingUtils.defaultScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime())) {
+
+                       slotManager.start(leaderId, mainThreadExecutorMock, 
resourceManagerActions);
+
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
+
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       ArgumentCaptor<SlotID> slotIdCaptor = 
ArgumentCaptor.forClass(SlotID.class);
+
+                       verify(taskExecutorGateway, times(1)).requestSlot(
+                               slotIdCaptor.capture(),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               any(Time.class));
+
+                       final SlotStatus newSlotStatus1 = new 
SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new 
AllocationID());
+                       final SlotStatus newSlotStatus2 = new 
SlotStatus(slotId3, resourceProfile);
+                       final SlotReport newSlotReport = new 
SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
+
+                       // this should remove the unused slot, replacing it 
with slotId3 and retry the pending slot request
+                       
slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), 
newSlotReport);
+
+                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       
verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture());
+
+                       Runnable requestFailureRunnable = 
runnableArgumentCaptor.getValue();
+
+                       requestFailureRunnable.run();
+
+                       verify(taskExecutorGateway, times(2)).requestSlot(
+                               slotIdCaptor.capture(),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               any(Time.class));
+
+                       verify(mainThreadExecutorMock, 
times(2)).execute(runnableArgumentCaptor.capture());
+                       Runnable requestSuccessRunnable = 
runnableArgumentCaptor.getValue();
+
+                       requestSuccessRunnable.run();
+
+                       final SlotID requestedSlotId = slotIdCaptor.getValue();
+
+                       assertEquals(slotId3, requestedSlotId);
+
+                       TaskManagerSlot slot = 
slotManager.getSlot(requestedSlotId);
+
+                       assertTrue(slot.isAllocated());
+                       assertEquals(allocationId, slot.getAllocationId());
                }
+       }
+
+       /**
+        * Tests that formerly used task managers can again timeout after all 
of their slots have
+        * been freed.
+        */
+       @Test
+       public void testTimeoutForUnusedTaskManager() throws Exception {
+               final long taskManagerTimeout = 123456L;
+
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
+               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
+
+               final ResourceID resourceId = ResourceID.generate();
+
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
+
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       any(SlotID.class),
+                       eq(jobId),
+                       eq(allocationId),
+                       anyString(),
+                       eq(leaderId),
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
+
+               final SlotID slotId1 = new SlotID(resourceId, 0);
+               final SlotID slotId2 = new SlotID(resourceId, 1);
+               final SlotStatus slotStatus1 = new SlotStatus(slotId1, 
resourceProfile);
+               final SlotStatus slotStatus2 = new SlotStatus(slotId2, 
resourceProfile);
+               final SlotReport initialSlotReport = new 
SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+               try (SlotManager slotManager = new SlotManager(
+                       scheduledExecutor,
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) {
+
+                       slotManager.start(leaderId, Executors.directExecutor(), 
resourceManagerActions);
+
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       slotManager.registerTaskManager(taskManagerConnection, 
initialSlotReport);
+
+                       ArgumentCaptor<SlotID> slotIdArgumentCaptor = 
ArgumentCaptor.forClass(SlotID.class);
+
+                       verify(taskExecutorGateway).requestSlot(
+                               slotIdArgumentCaptor.capture(),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               any(Time.class));
+
+                       
assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+
+                       SlotID slotId = slotIdArgumentCaptor.getValue();
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+                       assertTrue(slot.isAllocated());
+                       assertEquals(allocationId, slot.getAllocationId());
+
+                       slotManager.freeSlot(slotId, allocationId);
+
+                       
assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+
+                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+
+                       // filter out the schedule call for the task manager 
which will be registered using the
+                       // taskManagerTimeout value
+                       
verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), 
eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS));
+
+                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
 
-               List<ResourceProfile> getAllocatedContainers() {
-                       return testingRmServices.allocatedContainers;
+                       timeoutRunnable.run();
+
+                       verify(resourceManagerActions, 
times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
                }
+       }
 
+       /**
+        * Tests that the slot manager re-registers a timeout for a rejected 
slot request.
+        */
+       @Test
+       public void testTimeoutForRejectedSlotRequest() throws Exception {
 
-               private static class TestingRmServices implements 
ResourceManagerServices {
+               final long slotRequestTimeout = 1337L;
+               final ScheduledExecutor scheduledExecutor = 
mock(ScheduledExecutor.class);
 
-                       private final UUID leaderID;
+               final ResourceID resourceId = ResourceID.generate();
+               final SlotID slotId = new SlotID(resourceId, 0);
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
+               final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
+               final SlotReport slotReport = new SlotReport(slotStatus);
 
-                       private final List<ResourceProfile> allocatedContainers;
+               final UUID leaderId = UUID.randomUUID();
+               final ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
-                       public TestingRmServices() {
-                               this.leaderID = UUID.randomUUID();
-                               this.allocatedContainers = new LinkedList<>();
-                       }
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+               final AllocationID allocationId2 = new AllocationID();
+               final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
 
-                       @Override
-                       public UUID getLeaderID() {
-                               return leaderID;
-                       }
+               CompletableFuture<Acknowledge> requestFuture = new 
FlinkCompletableFuture<>();
 
-                       @Override
-                       public void allocateResource(ResourceProfile 
resourceProfile) {
-                               allocatedContainers.add(resourceProfile);
-                       }
+               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               when(taskExecutorGateway.requestSlot(
+                       eq(slotId),
+                       eq(jobId),
+                       eq(allocationId),
+                       anyString(),
+                       eq(leaderId),
+                       any(Time.class))).thenReturn(requestFuture);
 
-                       @Override
-                       public Executor getAsyncExecutor() {
-                               return Mockito.mock(Executor.class);
-                       }
+               final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
 
-                       @Override
-                       public Executor getMainThreadExecutor() {
-                               return Mockito.mock(Executor.class);
-                       }
+               try (SlotManager slotManager = new SlotManager(
+                       scheduledExecutor,
+                       TestingUtils.infiniteTime(),
+                       Time.milliseconds(slotRequestTimeout),
+                       TestingUtils.infiniteTime())) {
+
+                       slotManager.start(leaderId, Executors.directExecutor(), 
resourceManagerActions);
+
+                       slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
 
+                       slotManager.registerSlotRequest(slotRequest);
+
+                       verify(taskExecutorGateway).requestSlot(
+                               eq(slotId),
+                               eq(jobId),
+                               eq(allocationId),
+                               anyString(),
+                               eq(leaderId),
+                               any(Time.class));
+
+                       requestFuture.completeExceptionally(new 
SlotOccupiedException("Slot is already occupied", allocationId2));
+
+                       ArgumentCaptor<Runnable> runnableArgumentCaptor = 
ArgumentCaptor.forClass(Runnable.class);
+                       
verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), 
eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS));
+
+                       Runnable timeoutRunnable = 
runnableArgumentCaptor.getValue();
+
+                       timeoutRunnable.run();
+
+                       
verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), 
eq(allocationId), any(Exception.class));
+
+                       TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+                       assertTrue(slot.isAllocated());
+                       assertEquals(allocationId2, slot.getAllocationId());
                }
        }
+
+       private SlotManager createSlotManager(UUID leaderId, 
ResourceManagerActions resourceManagerActions) {
+               SlotManager slotManager = new SlotManager(
+                       TestingUtils.defaultScheduledExecutor(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime());
+
+               slotManager.start(leaderId, Executors.directExecutor(), 
resourceManagerActions);
+
+               return slotManager;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 37690b5..a72969e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,47 +19,29 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.TestingSlotManager;
-import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -70,128 +52,75 @@ import static org.mockito.Mockito.verify;
 
 public class SlotProtocolTest extends TestLogger {
 
-       private static TestingSerialRpcService testRpcService;
+       private static final long timeout = 10000L;
+
+
+       private static ScheduledExecutorService scheduledExecutorService;
 
        @BeforeClass
        public static void beforeClass() {
-               testRpcService = new TestingSerialRpcService();
+               scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
        }
 
        @AfterClass
        public static void afterClass() {
-               testRpcService.stopService();
-               testRpcService = null;
-       }
-
-       @Before
-       public void beforeTest(){
-               testRpcService.clearGateways();
+               Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, 
scheduledExecutorService);
        }
 
        /**
         * Tests whether
-        * 1) SlotRequest is routed to the SlotManager
-        * 2) SlotRequest is confirmed
-        * 3) SlotRequest leads to a container allocation
-        * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+        * 1) SlotManager accepts a slot request
+        * 2) SlotRequest leads to a container allocation
+        * 3) Slot becomes available and TaskExecutor gets a SlotRequest
         */
        @Test
        public void testSlotsUnavailableRequest() throws Exception {
-               final String rmAddress = "/rm1";
-               final String jmAddress = "/jm1";
                final JobID jobID = new JobID();
-               final ResourceID rmResourceId = new ResourceID(rmAddress);
                final ResourceID jmResourceId = new ResourceID(jmAddress);
 
-               testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
-
-               final TestingHighAvailabilityServices testingHaServices = new 
TestingHighAvailabilityServices();
                final UUID rmLeaderID = UUID.randomUUID();
-               final UUID jmLeaderID = UUID.randomUUID();
-               TestingLeaderElectionService rmLeaderElectionService =
-                       configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
-
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
-
-               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
-                       testingHaServices,
-                       testRpcService.getScheduledExecutor(),
-                       Time.seconds(5L));
-
-               final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
-
-               final HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
-
-               SpiedResourceManager resourceManager =
-                       new SpiedResourceManager(
-                               rmResourceId,
-                               testRpcService,
-                               resourceManagerConfiguration,
-                               testingHaServices,
-                               heartbeatServices,
-                               slotManagerFactory,
-                               mock(MetricRegistry.class),
-                               jobLeaderIdService,
-                               mock(FatalErrorHandler.class));
-               resourceManager.start();
-               rmLeaderElectionService.isLeader(rmLeaderID);
-
-               Future<RegistrationResponse> registrationFuture =
-                       resourceManager.registerJobManager(rmLeaderID, 
jmLeaderID, jmResourceId, jmAddress, jobID);
-               try {
-                       registrationFuture.get(5, TimeUnit.SECONDS);
-               } catch (Exception e) {
-                       Assert.fail("JobManager registration Future didn't 
become ready.");
-               }
 
-               final SlotManager slotManager = slotManagerFactory.slotManager;
+               try (SlotManager slotManager = new SlotManager(
+                       new 
ScheduledExecutorServiceAdapter(scheduledExecutorService),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime())) {
 
-               final AllocationID allocationID = new AllocationID();
-               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
+                       ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
-               SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
-               RMSlotRequestReply slotRequestReply =
-                       resourceManager.requestSlot(jmLeaderID, rmLeaderID, 
slotRequest);
+                       slotManager.start(rmLeaderID, 
Executors.directExecutor(), resourceManagerActions);
 
-               // 1) SlotRequest is routed to the SlotManager
-               verify(slotManager).requestSlot(slotRequest);
+                       final AllocationID allocationID = new AllocationID();
+                       final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
+                       final String targetAddress = "foobar";
 
-               // 2) SlotRequest is confirmed
-               Assert.assertEquals(
-                       slotRequestReply.getAllocationID(),
-                       allocationID);
+                       SlotRequest slotRequest = new SlotRequest(jobID, 
allocationID, resourceProfile, targetAddress);
 
-               // 3) SlotRequest leads to a container allocation
-               Assert.assertEquals(1, resourceManager.startNewWorkerCalled);
+                       slotManager.registerSlotRequest(slotRequest);
 
-               Assert.assertFalse(slotManager.isAllocated(allocationID));
+                       
verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile()));
 
-               // slot becomes available
-               final String tmAddress = "/tm1";
-               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               Mockito
-                       .when(
+                       // slot becomes available
+                       TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+                       Mockito.when(
                                taskExecutorGateway
                                        .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), 
any(Time.class)))
-                       .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
-               testRpcService.registerGateway(tmAddress, taskExecutorGateway);
-
-               final ResourceID resourceID = ResourceID.generate();
-               final SlotID slotID = new SlotID(resourceID, 0);
-
-               final SlotStatus slotStatus =
-                       new SlotStatus(slotID, resourceProfile);
-               final SlotReport slotReport =
-                       new SlotReport(Collections.singletonList(slotStatus));
-               // register slot at SlotManager
-               slotManager.registerTaskExecutor(
-                       resourceID, new 
TaskExecutorRegistration(taskExecutorGateway), slotReport);
-
-               // 4) Slot becomes available and TaskExecutor gets a SlotRequest
-               verify(taskExecutorGateway, timeout(5000))
-                       .requestSlot(eq(slotID), eq(jobID), eq(allocationID), 
any(String.class), any(UUID.class), any(Time.class));
+                               .thenReturn(mock(FlinkFuture.class));
+
+                       final ResourceID resourceID = ResourceID.generate();
+                       final SlotID slotID = new SlotID(resourceID, 0);
+
+                       final SlotStatus slotStatus =
+                               new SlotStatus(slotID, resourceProfile);
+                       final SlotReport slotReport =
+                               new 
SlotReport(Collections.singletonList(slotStatus));
+                       // register slot at SlotManager
+                       slotManager.registerTaskManager(new 
TaskExecutorConnection(taskExecutorGateway), slotReport);
+
+                       // 4) Slot becomes available and TaskExecutor gets a 
SlotRequest
+                       verify(taskExecutorGateway, timeout(5000L))
+                               .requestSlot(eq(slotID), eq(jobID), 
eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
+               }
        }
 
        /**
@@ -203,159 +132,48 @@ public class SlotProtocolTest extends TestLogger {
         */
        @Test
        public void testSlotAvailableRequest() throws Exception {
-               final String rmAddress = "/rm1";
-               final String jmAddress = "/jm1";
-               final String tmAddress = "/tm1";
                final JobID jobID = new JobID();
-               final ResourceID rmResourceId = new ResourceID(rmAddress);
                final ResourceID jmResourceId = new ResourceID(jmAddress);
 
-               testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
-
-               final TestingHighAvailabilityServices testingHaServices = new 
TestingHighAvailabilityServices();
                final UUID rmLeaderID = UUID.randomUUID();
-               final UUID jmLeaderID = UUID.randomUUID();
-               TestingLeaderElectionService rmLeaderElectionService =
-                       configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
                Mockito.when(
                        taskExecutorGateway
                                .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), 
any(Time.class)))
-                       .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
-               testRpcService.registerGateway(tmAddress, taskExecutorGateway);
-
-               ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
-                       Time.seconds(5L),
-                       Time.seconds(5L));
-
-               JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
-                       testingHaServices,
-                       testRpcService.getScheduledExecutor(),
-                       Time.seconds(5L));
-
-               TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
-
-               HeartbeatServices heartbeatServices = 
mock(HeartbeatServices.class);
-
-               ResourceManager<ResourceID> resourceManager =
-                       Mockito.spy(new StandaloneResourceManager(
-                               testRpcService,
-                               FlinkResourceManager.RESOURCE_MANAGER_NAME,
-                               rmResourceId,
-                               resourceManagerConfiguration,
-                               testingHaServices,
-                               heartbeatServices,
-                               slotManagerFactory,
-                               mock(MetricRegistry.class),
-                               jobLeaderIdService,
-                               mock(FatalErrorHandler.class)));
-               resourceManager.start();
-               rmLeaderElectionService.isLeader(rmLeaderID);
-
-               Thread.sleep(1000);
-
-               Future<RegistrationResponse> registrationFuture =
-                       resourceManager.registerJobManager(rmLeaderID, 
jmLeaderID, jmResourceId, jmAddress, jobID);
-               try {
-                       registrationFuture.get(5L, TimeUnit.SECONDS);
-               } catch (Exception e) {
-                       Assert.fail("JobManager registration Future didn't 
become ready.");
-               }
-
-               final SlotManager slotManager = slotManagerFactory.slotManager;
+                       .thenReturn(mock(FlinkFuture.class));
 
-               final ResourceID resourceID = ResourceID.generate();
-               final AllocationID allocationID = new AllocationID();
-               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
-               final SlotID slotID = new SlotID(resourceID, 0);
+               try (SlotManager slotManager = new SlotManager(
+                       new 
ScheduledExecutorServiceAdapter(scheduledExecutorService),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime())) {
 
-               final SlotStatus slotStatus =
-                       new SlotStatus(slotID, resourceProfile);
-               final SlotReport slotReport =
-                       new SlotReport(Collections.singletonList(slotStatus));
-               // register slot at SlotManager
-               slotManager.registerTaskExecutor(
-                       resourceID, new 
TaskExecutorRegistration(taskExecutorGateway), slotReport);
+                       ResourceManagerActions resourceManagerActions = 
mock(ResourceManagerActions.class);
 
-               SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
-               RMSlotRequestReply slotRequestReply =
-                       resourceManager.requestSlot(jmLeaderID, rmLeaderID, 
slotRequest);
+                       slotManager.start(rmLeaderID, 
Executors.directExecutor(), resourceManagerActions);
 
-               // 1) a SlotRequest is routed to the SlotManager
-               verify(slotManager).requestSlot(slotRequest);
+                       final ResourceID resourceID = ResourceID.generate();
+                       final AllocationID allocationID = new AllocationID();
+                       final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
+                       final SlotID slotID = new SlotID(resourceID, 0);
 
-               // 2) a SlotRequest is confirmed
-               Assert.assertEquals(
-                       slotRequestReply.getAllocationID(),
-                       allocationID);
-
-               // 3) a SlotRequest leads to an allocation of a registered slot
-               Assert.assertTrue(slotManager.isAllocated(slotID));
-               Assert.assertTrue(slotManager.isAllocated(allocationID));
-
-               // 4) a SlotRequest is routed to the TaskExecutor
-               verify(taskExecutorGateway, timeout(5000))
-                       .requestSlot(eq(slotID), eq(jobID), eq(allocationID), 
any(String.class), any(UUID.class), any(Time.class));
-       }
-
-       private static TestingLeaderElectionService configureHA(
-                       TestingHighAvailabilityServices testingHA, JobID jobID, 
String rmAddress, UUID rmID, String jmAddress, UUID jmID) {
-               final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
-               
testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-               final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(rmAddress, rmID);
-               
testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-
-               final TestingLeaderElectionService jmLeaderElectionService = 
new TestingLeaderElectionService();
-               testingHA.setJobMasterLeaderElectionService(jobID, 
jmLeaderElectionService);
-               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jmAddress, jmID);
-               testingHA.setJobMasterLeaderRetriever(jobID, 
jmLeaderRetrievalService);
-
-               return rmLeaderElectionService;
-       }
-
-       private static class SpiedResourceManager extends 
StandaloneResourceManager {
-
-               private int startNewWorkerCalled = 0;
-
-               public SpiedResourceManager(
-                               ResourceID resourceId,
-                               RpcService rpcService,
-                               ResourceManagerConfiguration 
resourceManagerConfiguration,
-                               HighAvailabilityServices 
highAvailabilityServices,
-                               HeartbeatServices heartbeatServices,
-                               SlotManagerFactory slotManagerFactory,
-                               MetricRegistry metricRegistry,
-                               JobLeaderIdService jobLeaderIdService,
-                               FatalErrorHandler fatalErrorHandler) {
-                       super(
-                               rpcService,
-                               FlinkResourceManager.RESOURCE_MANAGER_NAME,
-                               resourceId,
-                               resourceManagerConfiguration,
-                               highAvailabilityServices,
-                               heartbeatServices,
-                               slotManagerFactory,
-                               metricRegistry,
-                               jobLeaderIdService,
-                               fatalErrorHandler);
-               }
-
-
-               @Override
-               public void startNewWorker(ResourceProfile resourceProfile) {
-                       startNewWorkerCalled++;
-               }
-       }
+                       final SlotStatus slotStatus =
+                               new SlotStatus(slotID, resourceProfile);
+                       final SlotReport slotReport =
+                               new 
SlotReport(Collections.singletonList(slotStatus));
+                       // register slot at SlotManager
+                       slotManager.registerTaskManager(
+                               new 
TaskExecutorConnection(taskExecutorGateway), slotReport);
 
-       private static class TestingSlotManagerFactory implements 
SlotManagerFactory {
+                       final String targetAddress = "foobar";
 
-               private SlotManager slotManager;
+                       SlotRequest slotRequest = new SlotRequest(jobID, 
allocationID, resourceProfile, targetAddress);
+                       slotManager.registerSlotRequest(slotRequest);
 
-               @Override
-               public SlotManager create(ResourceManagerServices rmServices) {
-                       this.slotManager = Mockito.spy(new 
TestingSlotManager(rmServices));
-                       return this.slotManager;
+                       // a SlotRequest is routed to the TaskExecutor
+                       verify(taskExecutorGateway, timeout(5000))
+                               .requestSlot(eq(slotID), eq(jobID), 
eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
                }
        }
 }

Reply via email to