http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
deleted file mode 100644
index 2ee280f..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/SlotManagerTest.java
+++ /dev/null
@@ -1,540 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
-import org.apache.flink.runtime.rpc.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.rpc.taskexecutor.SlotStatus;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class SlotManagerTest {
-
-       private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
-
-       private static final long DEFAULT_TESTING_MEMORY = 512;
-
-       private static final ResourceProfile DEFAULT_TESTING_PROFILE =
-               new ResourceProfile(DEFAULT_TESTING_CPU_CORES, 
DEFAULT_TESTING_MEMORY);
-
-       private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
-               new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
-
-       private ResourceManagerGateway resourceManagerGateway;
-
-       @Before
-       public void setUp() {
-               resourceManagerGateway = mock(ResourceManagerGateway.class);
-       }
-
-       /**
-        * Tests that there are no free slots when we request, need to allocate 
from cluster manager master
-        */
-       @Test
-       public void testRequestSlotWithoutFreeSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-       }
-
-       /**
-        * Tests that there are some free slots when we request, and the 
request is fulfilled immediately
-        */
-       @Test
-       public void testRequestSlotWithFreeSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
-               assertEquals(1, slotManager.getFreeSlotCount());
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertEquals(0, slotManager.getAllocatedContainers().size());
-       }
-
-       /**
-        * Tests that there are some free slots when we request, but none of 
them are suitable
-        */
-       @Test
-       public void testRequestSlotWithoutSuitableSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
2);
-               assertEquals(2, slotManager.getFreeSlotCount());
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(2, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-       }
-
-       /**
-        * Tests that we send duplicated slot request
-        */
-       @Test
-       public void testDuplicatedSlotRequest() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
-
-               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
-               slotManager.requestSlot(request1);
-               slotManager.requestSlot(request2);
-               slotManager.requestSlot(request2);
-               slotManager.requestSlot(request1);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-       }
-
-       /**
-        * Tests that we send multiple slot requests
-        */
-       @Test
-       public void testRequestMultipleSlots() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
5);
-
-               // request 3 normal slots
-               for (int i = 0; i < 3; ++i) {
-                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_PROFILE));
-               }
-
-               // request 2 big slots
-               for (int i = 0; i < 2; ++i) {
-                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               }
-
-               // request 1 normal slot again
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(4, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(2, slotManager.getPendingRequestCount());
-               assertEquals(2, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(1));
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, and we used it to 
fulfill a pending request
-        */
-       @Test
-       public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               assertEquals(1, slotManager.getPendingRequestCount());
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, but we have no pending 
request
-        */
-       @Test
-       public void testNewlyAppearedFreeSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, but it't not suitable 
for all the pending requests
-        */
-       @Test
-       public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               assertEquals(1, slotManager.getPendingRequestCount());
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, and it's been reported 
using by some job
-        */
-       @Test
-       public void testNewlyAppearedInUseSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that we had a slot in-use, and it's confirmed by SlotReport
-        */
-       @Test
-       public void testExistingInUseSlotUpdateStatus() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
-
-               // slot status is confirmed
-               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE,
-                       request.getAllocationId(), request.getJobId());
-               slotManager.updateSlotStatus(slotStatus2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that we had a slot in-use, but it's empty according to the 
SlotReport
-        */
-       @Test
-       public void testExistingInUseSlotAdjustedToEmpty() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request1);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               // another request pending
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request1.getAllocationId()));
-
-
-               // but slot is reported empty again, request2 will be 
fulfilled, request1 will be missing
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-       }
-
-       /**
-        * Tests that we had a slot in use, and it's also reported in use by 
TaskManager, but the allocation
-        * information didn't match.
-        */
-       @Test
-       public void testExistingInUseSlotWithDifferentAllocationInfo() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
-               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-               // update slot status with different allocation info
-               slotManager.updateSlotStatus(slotStatus2);
-
-               // original request is missing and won't be allocated
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               
assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
-       }
-
-       /**
-        * Tests that we had a free slot, and it's confirmed by SlotReport
-        */
-       @Test
-       public void testExistingEmptySlotUpdateStatus() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
-
-       /**
-        * Tests that we had a free slot, and it's reported in-use by 
TaskManager
-        */
-       @Test
-       public void testExistingEmptySlotAdjustedToInUse() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE,
-                       new AllocationID(), new JobID());
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slot.getSlotId()));
-       }
-
-       /**
-        * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
-        */
-       @Test
-       public void testSlotAllocationFailedAtTaskManager() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slot.getSlotId()));
-
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
-
-
-       /**
-        * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
-        */
-       @Test
-       public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // slot is set empty by heartbeat
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
slot.getResourceProfile());
-               slotManager.updateSlotStatus(slotStatus);
-
-               // another request took this slot
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
-               // original request should be pended
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-       }
-
-       @Test
-       public void testNotifyTaskManagerFailure() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               ResourceID resource1 = ResourceID.generate();
-               ResourceID resource2 = ResourceID.generate();
-
-               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE);
-               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE);
-               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE);
-               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE);
-
-               slotManager.addFreeSlot(slot11);
-               slotManager.addFreeSlot(slot21);
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(2, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               slotManager.addFreeSlot(slot12);
-               slotManager.addFreeSlot(slot22);
-
-               assertEquals(2, slotManager.getAllocatedSlotCount());
-               assertEquals(2, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               slotManager.notifyTaskManagerFailure(resource2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               // notify an not exist resource failure
-               slotManager.notifyTaskManagerFailure(ResourceID.generate());
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  testing utilities
-       // 
------------------------------------------------------------------------
-
-       private void directlyProvideFreeSlots(
-               final SlotManager slotManager,
-               final ResourceProfile resourceProfile,
-               final int freeSlotNum)
-       {
-               for (int i = 0; i < freeSlotNum; ++i) {
-                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  testing classes
-       // 
------------------------------------------------------------------------
-
-       private static class TestingSlotManager extends SlotManager {
-
-               private final List<ResourceProfile> allocatedContainers;
-
-               TestingSlotManager(ResourceManagerGateway 
resourceManagerGateway) {
-                       super(resourceManagerGateway);
-                       this.allocatedContainers = new LinkedList<>();
-               }
-
-               /**
-                * Choose slot randomly if it matches requirement
-                *
-                * @param request   The slot request
-                * @param freeSlots All slots which can be used
-                * @return The chosen slot or null if cannot find a match
-                */
-               @Override
-               protected ResourceSlot chooseSlotToUse(SlotRequest request, 
Map<SlotID, ResourceSlot> freeSlots) {
-                       for (ResourceSlot slot : freeSlots.values()) {
-                               if 
(slot.isMatchingRequirement(request.getResourceProfile())) {
-                                       return slot;
-                               }
-                       }
-                       return null;
-               }
-
-               /**
-                * Choose request randomly if offered slot can match its 
requirement
-                *
-                * @param offeredSlot     The free slot
-                * @param pendingRequests All the pending slot requests
-                * @return The chosen request's AllocationID or null if cannot 
find a match
-                */
-               @Override
-               protected SlotRequest chooseRequestToFulfill(ResourceSlot 
offeredSlot,
-                       Map<AllocationID, SlotRequest> pendingRequests)
-               {
-                       for (Map.Entry<AllocationID, SlotRequest> 
pendingRequest : pendingRequests.entrySet()) {
-                               if 
(offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile()))
 {
-                                       return pendingRequest.getValue();
-                               }
-                       }
-                       return null;
-               }
-
-               @Override
-               protected void allocateContainer(ResourceProfile 
resourceProfile) {
-                       allocatedContainers.add(resourceProfile);
-               }
-
-               List<ResourceProfile> getAllocatedContainers() {
-                       return allocatedContainers;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
new file mode 100644
index 0000000..80fa19c
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import org.slf4j.LoggerFactory;
+
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeoutException;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for the generic retrying registration class, validating the failure, 
retry, and back-off behavior.
+ */
+public class RetryingRegistrationTest extends TestLogger {
+
+       @Test
+       public void testSimpleSuccessfulRegistration() throws Exception {
+               final String testId = "laissez les bon temps roulez";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               // an endpoint that immediately returns success
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+                       registration.startRegistration();
+
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       assertNotNull(future);
+
+                       // multiple accesses return the same future
+                       assertEquals(future, registration.getFuture());
+
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success = 
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+               }
+               finally {
+                       testGateway.stop();
+                       rpc.stopService();
+               }
+       }
+       
+       @Test
+       public void testPropagateFailures() throws Exception {
+               final String testExceptionMessage = "testExceptionMessage";
+
+               // RPC service that fails with exception upon the connection
+               RpcService rpc = mock(RpcService.class);
+               when(rpc.connect(anyString(), any(Class.class))).thenThrow(new 
RuntimeException(testExceptionMessage));
+
+               TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "testaddress", UUID.randomUUID());
+               registration.startRegistration();
+
+               Future<?> future = registration.getFuture();
+               assertTrue(future.failed().isCompleted());
+
+               assertEquals(testExceptionMessage, 
future.failed().value().get().get().getMessage());
+       }
+
+       @Test
+       public void testRetryConnectOnFailure() throws Exception {
+               final String testId = "laissez les bon temps roulez";
+               final UUID leaderId = UUID.randomUUID();
+
+               ExecutorService executor = Executors.newCachedThreadPool();
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
+
+               try {
+                       // RPC service that fails upon the first connection, 
but succeeds on the second
+                       RpcService rpc = mock(RpcService.class);
+                       when(rpc.connect(anyString(), 
any(Class.class))).thenReturn(
+                                       Futures.failed(new Exception("test 
connect failure")),  // first connection attempt fails
+                                       Futures.successful(testGateway)         
                // second connection attempt succeeds
+                       );
+                       
when(rpc.getExecutionContext()).thenReturn(ExecutionContext$.MODULE$.fromExecutor(executor));
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "foobar address", leaderId);
+                       registration.startRegistration();
+
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(registration.getFuture(), 
new FiniteDuration(10, SECONDS));
+
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+               }
+               finally {
+                       testGateway.stop();
+                       executor.shutdown();
+               }
+       }
+
+       @Test
+       public void testRetriesOnTimeouts() throws Exception {
+               final String testId = "rien ne va plus";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               // an endpoint that immediately returns futures with timeouts 
before returning a successful future
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
+                               null, // timeout
+                               null, // timeout
+                               new TestRegistrationSuccess(testId) // success
+               );
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+       
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+       
+                       long started = System.nanoTime();
+                       registration.startRegistration();
+       
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+       
+                       long finished = System.nanoTime();
+                       long elapsedMillis = (finished - started) / 1000000;
+       
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+       
+                       // validate that some retry-delay / back-off behavior 
happened
+                       assertTrue("retries did not properly back off", 
elapsedMillis >= 3 * TestRetryingRegistration.INITIAL_TIMEOUT);
+               }
+               finally {
+                       rpc.stopService();
+                       testGateway.stop();
+               }
+       }
+
+       @Test
+       public void testDecline() throws Exception {
+               final String testId = "qui a coupe le fromage";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               TestRegistrationGateway testGateway = new 
TestRegistrationGateway(
+                               null, // timeout
+                               new RegistrationResponse.Decline("no reason "),
+                               null, // timeout
+                               new TestRegistrationSuccess(testId) // success
+               );
+
+               try {
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+                       long started = System.nanoTime();
+                       registration.startRegistration();
+       
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+
+                       long finished = System.nanoTime();
+                       long elapsedMillis = (finished - started) / 1000000;
+
+                       // validate correct invocation and result
+                       assertEquals(testId, success.f1.getCorrelationId());
+                       assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
+
+                       // validate that some retry-delay / back-off behavior 
happened
+                       assertTrue("retries did not properly back off", 
elapsedMillis >= 
+                                       2 * 
TestRetryingRegistration.INITIAL_TIMEOUT + 
TestRetryingRegistration.DELAY_ON_DECLINE);
+               }
+               finally {
+                       testGateway.stop();
+                       rpc.stopService();
+               }
+       }
+       
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testRetryOnError() throws Exception {
+               final String testId = "Petit a petit, l'oiseau fait son nid";
+               final String testEndpointAddress = "<test-address>";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       // gateway that upon calls first responds with a 
failure, then with a success
+                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+
+                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(
+                                       
Futures.<RegistrationResponse>failed(new Exception("test exception")),
+                                       
Futures.<RegistrationResponse>successful(new TestRegistrationSuccess(testId)));
+                       
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+
+                       long started = System.nanoTime();
+                       registration.startRegistration();
+
+                       Future<Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess>> future = registration.getFuture();
+                       Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
+                                       Await.result(future, new 
FiniteDuration(10, SECONDS));
+
+                       long finished = System.nanoTime();
+                       long elapsedMillis = (finished - started) / 1000000;
+                       
+                       assertEquals(testId, success.f1.getCorrelationId());
+
+                       // validate that some retry-delay / back-off behavior 
happened
+                       assertTrue("retries did not properly back off",
+                                       elapsedMillis >= 
TestRetryingRegistration.DELAY_ON_ERROR);
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+
+       @Test
+       public void testCancellation() throws Exception {
+               final String testEndpointAddress = "my-test-address";
+               final UUID leaderId = UUID.randomUUID();
+
+               TestingRpcService rpc = new TestingRpcService();
+
+               try {
+                       Promise<RegistrationResponse> result = 
Futures.promise();
+
+                       TestRegistrationGateway testGateway = 
mock(TestRegistrationGateway.class);
+                       when(testGateway.registrationCall(any(UUID.class), 
anyLong())).thenReturn(result.future());
+
+                       rpc.registerGateway(testEndpointAddress, testGateway);
+
+                       TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, testEndpointAddress, leaderId);
+                       registration.startRegistration();
+
+                       // cancel and fail the current registration attempt
+                       registration.cancel();
+                       result.failure(new TimeoutException());
+
+                       // there should not be a second registration attempt
+                       verify(testGateway, 
atMost(1)).registrationCall(any(UUID.class), anyLong());
+               }
+               finally {
+                       rpc.stopService();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  test registration
+       // 
------------------------------------------------------------------------
+
+       private static class TestRegistrationSuccess extends 
RegistrationResponse.Success {
+               private static final long serialVersionUID = 
5542698790917150604L;
+
+               private final String correlationId;
+
+               private TestRegistrationSuccess(String correlationId) {
+                       this.correlationId = correlationId;
+               }
+
+               public String getCorrelationId() {
+                       return correlationId;
+               }
+       }
+
+       private static class TestRetryingRegistration extends 
RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+
+               // we use shorter timeouts here to speed up the tests
+               static final long INITIAL_TIMEOUT = 20;
+               static final long MAX_TIMEOUT = 200;
+               static final long DELAY_ON_ERROR = 200;
+               static final long DELAY_ON_DECLINE = 200;
+
+               public TestRetryingRegistration(RpcService rpc, String 
targetAddress, UUID leaderId) {
+                       
super(LoggerFactory.getLogger(RetryingRegistrationTest.class),
+                                       rpc, "TestEndpoint",
+                                       TestRegistrationGateway.class,
+                                       targetAddress, leaderId,
+                                       INITIAL_TIMEOUT, MAX_TIMEOUT, 
DELAY_ON_ERROR, DELAY_ON_DECLINE);
+               }
+
+               @Override
+               protected Future<RegistrationResponse> invokeRegistration(
+                               TestRegistrationGateway gateway, UUID leaderId, 
long timeoutMillis) {
+                       return gateway.registrationCall(leaderId, 
timeoutMillis);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
new file mode 100644
index 0000000..431fbe8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.registration;
+
+import akka.dispatch.Futures;
+
+import org.apache.flink.runtime.rpc.TestingGatewayBase;
+import org.apache.flink.util.Preconditions;
+
+import scala.concurrent.Future;
+
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class TestRegistrationGateway extends TestingGatewayBase {
+
+       private final BlockingQueue<RegistrationCall> invocations;
+
+       private final RegistrationResponse[] responses;
+
+       private int pos;
+
+       public TestRegistrationGateway(RegistrationResponse... responses) {
+               Preconditions.checkArgument(responses != null && 
responses.length > 0);
+
+               this.invocations = new LinkedBlockingQueue<>();
+               this.responses = responses;
+               
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public Future<RegistrationResponse> registrationCall(UUID leaderId, 
long timeout) {
+               invocations.add(new RegistrationCall(leaderId, timeout));
+
+               RegistrationResponse response = responses[pos];
+               if (pos < responses.length - 1) {
+                       pos++;
+               }
+
+               // return a completed future (for a proper value), or one that 
never completes and will time out (for null)
+               return response != null ? Futures.successful(response) : 
this.<RegistrationResponse>futureWithTimeout(timeout);
+       }
+
+       public BlockingQueue<RegistrationCall> getInvocations() {
+               return invocations;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static class RegistrationCall {
+               private final UUID leaderId;
+               private final long timeout;
+
+               public RegistrationCall(UUID leaderId, long timeout) {
+                       this.leaderId = leaderId;
+                       this.timeout = timeout;
+               }
+
+               public UUID leaderId() {
+                       return leaderId;
+               }
+
+               public long timeout() {
+                       return timeout;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
deleted file mode 100644
index 32c6cac..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ClusterShutdownITCase.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.StopCluster;
-import 
org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.testingUtils.TestingMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-
-
-/**
- * Runs tests to ensure that a cluster is shutdown properly.
- */
-public class ClusterShutdownITCase extends TestLogger {
-
-       private static ActorSystem system;
-
-       private static Configuration config = new Configuration();
-
-       @BeforeClass
-       public static void setup() {
-               system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       /**
-        * Tests a faked cluster shutdown procedure without the ResourceManager.
-        */
-       @Test
-       public void testClusterShutdownWithoutResourceManager() {
-
-               new JavaTestKit(system){{
-               new Within(duration("30 seconds")) {
-               @Override
-               protected void run() {
-
-                       ActorGateway me =
-                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
-
-                       // start job manager which doesn't shutdown the actor 
system
-                       ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"jobmanager1");
-
-                       // Tell the JobManager to inform us of shutdown actions
-                       
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-                       // Register a TaskManager
-                       ActorGateway taskManager =
-                               TestingUtils.createTaskManager(system, 
jobManager, config, true, true);
-
-                       // Tell the TaskManager to inform us of TaskManager 
shutdowns
-                       
taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-
-                       // No resource manager connected
-                       jobManager.tell(new 
StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
-                       expectMsgAllOf(
-                               new 
TestingMessages.ComponentShutdown(taskManager.actor()),
-                               new 
TestingMessages.ComponentShutdown(jobManager.actor()),
-                               StopClusterSuccessful.getInstance()
-                       );
-
-               }};
-               }};
-       }
-
-       /**
-        * Tests a faked cluster shutdown procedure with the ResourceManager.
-        */
-       @Test
-       public void testClusterShutdownWithResourceManager() {
-
-               new JavaTestKit(system){{
-               new Within(duration("30 seconds")) {
-               @Override
-               protected void run() {
-
-                       ActorGateway me =
-                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
-
-                       // start job manager which doesn't shutdown the actor 
system
-                       ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"jobmanager2");
-
-                       // Tell the JobManager to inform us of shutdown actions
-                       
jobManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-                       // Register a TaskManager
-                       ActorGateway taskManager =
-                               TestingUtils.createTaskManager(system, 
jobManager, config, true, true);
-
-                       // Tell the TaskManager to inform us of TaskManager 
shutdowns
-                       
taskManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-                       // Start resource manager and let it register
-                       ActorGateway resourceManager =
-                               TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
-
-                       // Tell the ResourceManager to inform us of 
ResourceManager shutdowns
-                       
resourceManager.tell(TestingMessages.getNotifyOfComponentShutdown(), me);
-
-                       // notify about a resource manager registration at the 
job manager
-                       resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-                       // Wait for resource manager
-                       expectMsgEquals(Messages.getAcknowledge());
-
-
-                       // Shutdown cluster with resource manager connected
-                       jobManager.tell(new 
StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), me);
-
-                       expectMsgAllOf(
-                               new 
TestingMessages.ComponentShutdown(taskManager.actor()),
-                               new 
TestingMessages.ComponentShutdown(jobManager.actor()),
-                               new 
TestingMessages.ComponentShutdown(resourceManager.actor()),
-                               StopClusterSuccessful.getInstance()
-                       );
-
-               }};
-               }};
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..5799e62
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+       @Test
+       public void testGrantAndRevokeLeadership() throws Exception {
+               // mock a RpcService which will return a special RpcGateway 
when call its startServer method, the returned RpcGateway directly execute 
runAsync call
+               TestingResourceManagerGatewayProxy gateway = 
mock(TestingResourceManagerGatewayProxy.class);
+               doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+               RpcService rpcService = mock(RpcService.class);
+               
when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+               final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
+               resourceManager.start();
+               // before grant leadership, resourceManager's leaderId is null
+               Assert.assertNull(resourceManager.getLeaderSessionID());
+               final UUID leaderId = UUID.randomUUID();
+               leaderElectionService.isLeader(leaderId);
+               // after grant leadership, resourceManager's leaderId has value
+               Assert.assertEquals(leaderId, 
resourceManager.getLeaderSessionID());
+               // then revoke leadership, resourceManager's leaderId is null 
again
+               leaderElectionService.notLeader();
+               Assert.assertNull(resourceManager.getLeaderSessionID());
+       }
+
+       private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutor, StartStoppable, RpcGateway {
+               @Override
+               public void runAsync(Runnable runnable) {
+                       runnable.run();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
deleted file mode 100644
index 0c2ca1a..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerITCase.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.messages.Messages;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import scala.Option;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * It cases which test the interaction of the resource manager with job 
manager and task managers.
- * Runs all tests in one Actor system.
- */
-public class ResourceManagerITCase extends TestLogger {
-
-       private static ActorSystem system;
-
-       private static Configuration config = new Configuration();
-
-       @BeforeClass
-       public static void setup() {
-               system = 
AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       /**
-        * Tests whether the resource manager connects and reconciles existing 
task managers.
-        */
-       @Test
-       public void testResourceManagerReconciliation() {
-
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-
-                       ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"ReconciliationTest");
-                       ActorGateway me =
-                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
-
-                       // !! no resource manager started !!
-
-                       ResourceID resourceID = ResourceID.generate();
-
-                       TaskManagerLocation location = 
mock(TaskManagerLocation.class);
-                       when(location.getResourceID()).thenReturn(resourceID);
-
-                       HardwareDescription resourceProfile = 
HardwareDescription.extractFromSystem(1_000_000);
-
-                       jobManager.tell(
-                               new 
RegistrationMessages.RegisterTaskManager(resourceID, location, resourceProfile, 
1),
-                               me);
-
-                       
expectMsgClass(RegistrationMessages.AcknowledgeRegistration.class);
-
-                       // now start the resource manager
-                       ActorGateway resourceManager =
-                               TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
-
-                       // register at testing job manager to receive a message 
once a resource manager registers
-                       resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-                       // Wait for resource manager
-                       expectMsgEquals(Messages.getAcknowledge());
-
-                       // check if we registered the task manager resource
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), me);
-
-                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
-                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(1, reply.resources.size());
-                       assertTrue(reply.resources.contains(resourceID));
-
-               }};
-               }};
-       }
-
-       /**
-        * Tests whether the resource manager gets informed upon TaskManager 
registration.
-        */
-       @Test
-       public void testResourceManagerTaskManagerRegistration() {
-
-               new JavaTestKit(system){{
-               new Within(duration("30 seconds")) {
-               @Override
-               protected void run() {
-
-                       ActorGateway jobManager =
-                               TestingUtils.createJobManager(system, config, 
"RegTest");
-                       ActorGateway me =
-                               TestingUtils.createForwardingActor(system, 
getTestActor(), Option.<String>empty());
-
-                       // start the resource manager
-                       ActorGateway resourceManager =
-                               TestingUtils.createResourceManager(system, 
jobManager.actor(), config);
-
-                       // notify about a resource manager registration at the 
job manager
-                       resourceManager.tell(new 
TestingResourceManager.NotifyWhenResourceManagerConnected(), me);
-
-                       // Wait for resource manager
-                       expectMsgEquals(Messages.getAcknowledge());
-
-                       // start task manager and wait for registration
-                       ActorGateway taskManager =
-                               TestingUtils.createTaskManager(system, 
jobManager.actor(), config, true, true);
-
-                       // check if we registered the task manager resource
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), me);
-
-                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
-                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(1, reply.resources.size());
-
-               }};
-               }};
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/171cfd30/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
deleted file mode 100644
index 043c81c..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import 
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
-import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
-import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testutils.TestingResourceManager;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * General tests for the resource manager component.
- */
-public class ResourceManagerTest {
-
-       private static ActorSystem system;
-
-       private static ActorGateway fakeJobManager;
-       private static ActorGateway resourceManager;
-
-       private static Configuration config = new Configuration();
-
-       @BeforeClass
-       public static void setup() {
-               system = AkkaUtils.createLocalActorSystem(config);
-       }
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
-       /**
-        * Tests the registration and reconciliation of the ResourceManager 
with the JobManager
-        */
-       @Test
-       public void testJobManagerRegistrationAndReconciliation() {
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
-                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-                       expectMsgClass(RegisterResourceManager.class);
-
-                       List<ResourceID> resourceList = new ArrayList<>();
-                       resourceList.add(ResourceID.generate());
-                       resourceList.add(ResourceID.generate());
-                       resourceList.add(ResourceID.generate());
-
-                       resourceManager.tell(
-                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), resourceList),
-                               fakeJobManager);
-
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
-                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       for (ResourceID id : resourceList) {
-                               if (!reply.resources.contains(id)) {
-                                       fail("Expected to find all resources 
that were provided during registration.");
-                               }
-                       }
-               }};
-               }};
-       }
-
-       /**
-        * Tests delayed or erroneous registration of the ResourceManager with 
the JobManager
-        */
-       @Test
-       public void testDelayedJobManagerRegistration() {
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-
-                       // set a short timeout for lookups
-                       Configuration shortTimeoutConfig = config.clone();
-                       
shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "1 s");
-
-                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
-                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), 
shortTimeoutConfig);
-
-                       // wait for registration message
-                       RegisterResourceManager msg = 
expectMsgClass(RegisterResourceManager.class);
-                       // give wrong response
-                       getLastSender().tell(new 
JobManagerMessages.LeaderSessionMessage(null, new Object()),
-                               fakeJobManager.actor());
-
-                       // expect another retry and let it time out
-                       expectMsgClass(RegisterResourceManager.class);
-
-                       // wait for next try after timeout
-                       expectMsgClass(RegisterResourceManager.class);
-
-               }};
-               }};
-       }
-
-       @Test
-       public void testTriggerReconnect() {
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-
-                       // set a long timeout for lookups such that the test 
fails in case of timeouts
-                       Configuration shortTimeoutConfig = config.clone();
-                       
shortTimeoutConfig.setString(ConfigConstants.AKKA_LOOKUP_TIMEOUT, "99999 s");
-
-                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
-                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), 
shortTimeoutConfig);
-
-                       // wait for registration message
-                       RegisterResourceManager msg = 
expectMsgClass(RegisterResourceManager.class);
-                       // all went well
-                       resourceManager.tell(
-                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
-                               fakeJobManager);
-
-                       // force a reconnect
-                       resourceManager.tell(
-                               new 
TriggerRegistrationAtJobManager(fakeJobManager.actor()),
-                               fakeJobManager);
-
-                       // new registration attempt should come in
-                       expectMsgClass(RegisterResourceManager.class);
-
-               }};
-               }};
-       }
-
-       /**
-        * Tests the registration and accounting of resources at the 
ResourceManager.
-        */
-       @Test
-       public void testTaskManagerRegistration() {
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-
-                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
-                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-                       // register with JM
-                       expectMsgClass(RegisterResourceManager.class);
-                       resourceManager.tell(
-                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
-                               fakeJobManager);
-
-                       ResourceID resourceID = ResourceID.generate();
-
-                       // Send task manager registration
-                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
-                               fakeJobManager);
-
-                       expectMsgClass(Acknowledge.class);
-
-                       // check for number registration of registered resources
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
-                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(1, reply.resources.size());
-
-                       // Send task manager registration again
-                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
-                               fakeJobManager);
-
-                       expectMsgClass(Acknowledge.class);
-
-                       // check for number registration of registered resources
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       reply = 
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(1, reply.resources.size());
-
-                       // Send invalid null resource id to throw an exception 
during resource registration
-                       resourceManager.tell(new NotifyResourceStarted(null),
-                               fakeJobManager);
-
-                       expectMsgClass(Acknowledge.class);
-
-                       // check for number registration of registered resources
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       reply = 
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(1, reply.resources.size());
-               }};
-               }};
-       }
-
-       @Test
-       public void testResourceRemoval() {
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-
-                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
-                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-                       // register with JM
-                       expectMsgClass(RegisterResourceManager.class);
-                       resourceManager.tell(
-                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
-                               fakeJobManager);
-
-                       ResourceID resourceID = ResourceID.generate();
-
-                       // remove unknown resource
-                       resourceManager.tell(new RemoveResource(resourceID), 
fakeJobManager);
-
-                       // Send task manager registration
-                       resourceManager.tell(new 
NotifyResourceStarted(resourceID),
-                               fakeJobManager);
-
-                       expectMsgClass(Acknowledge.class);
-
-                       // check for number registration of registered resources
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
-                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(1, reply.resources.size());
-                       assertTrue(reply.resources.contains(resourceID));
-
-                       // remove resource
-                       resourceManager.tell(new RemoveResource(resourceID), 
fakeJobManager);
-
-                       // check for number registration of registered resources
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       reply = 
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(0, reply.resources.size());
-
-               }};
-               }};
-       }
-
-       /**
-        * Tests notification of JobManager about a failed resource.
-        */
-       @Test
-       public void testResourceFailureNotification() {
-               new JavaTestKit(system){{
-               new Within(duration("10 seconds")) {
-               @Override
-               protected void run() {
-
-                       fakeJobManager = 
TestingUtils.createForwardingActor(system, getTestActor(), 
Option.<String>empty());
-                       resourceManager = 
TestingUtils.createResourceManager(system, fakeJobManager.actor(), config);
-
-                       // register with JM
-                       expectMsgClass(RegisterResourceManager.class);
-                       resourceManager.tell(
-                               new 
RegisterResourceManagerSuccessful(fakeJobManager.actor(), 
Collections.<ResourceID>emptyList()),
-                               fakeJobManager);
-
-                       ResourceID resourceID1 = ResourceID.generate();
-                       ResourceID resourceID2 = ResourceID.generate();
-
-                       // Send task manager registration
-                       resourceManager.tell(new 
NotifyResourceStarted(resourceID1),
-                               fakeJobManager);
-
-                       expectMsgClass(Acknowledge.class);
-
-                       // Send task manager registration
-                       resourceManager.tell(new 
NotifyResourceStarted(resourceID2),
-                               fakeJobManager);
-
-                       expectMsgClass(Acknowledge.class);
-
-                       // check for number registration of registered resources
-                       resourceManager.tell(new 
TestingResourceManager.GetRegisteredResources(), fakeJobManager);
-                       TestingResourceManager.GetRegisteredResourcesReply 
reply =
-                               
expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
-
-                       assertEquals(2, reply.resources.size());
-                       assertTrue(reply.resources.contains(resourceID1));
-                       assertTrue(reply.resources.contains(resourceID2));
-
-                       // fail resources
-                       resourceManager.tell(new 
TestingResourceManager.FailResource(resourceID1), fakeJobManager);
-                       resourceManager.tell(new 
TestingResourceManager.FailResource(resourceID2), fakeJobManager);
-
-                       ResourceRemoved answer = 
expectMsgClass(ResourceRemoved.class);
-                       ResourceRemoved answer2 = 
expectMsgClass(ResourceRemoved.class);
-
-                       assertEquals(resourceID1, answer.resourceId());
-                       assertEquals(resourceID2, answer2.resourceId());
-
-               }};
-               }};
-       }
-}

Reply via email to