Repository: flink Updated Branches: refs/heads/flip-6 85424c135 -> 0518af03a
http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 8f09152..14afd0e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -25,10 +25,8 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -160,7 +158,7 @@ public class ResourceManagerJobMasterTest { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobID, jobMasterLeaderRetrievalService); - ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); + ResourceManager resourceManager = new TestingResourceManager(rpcService, highAvailabilityServices); resourceManager.start(); return resourceManager; } http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index e6d1ed5..a577c26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -22,10 +22,9 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; +import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.junit.After; @@ -44,9 +43,24 @@ public class ResourceManagerTaskExecutorTest { private TestingSerialRpcService rpcService; + private SlotReport slotReport = new SlotReport(); + + private static String taskExecutorAddress = "/taskExecutor1"; + + private ResourceID taskExecutorResourceID; + + private StandaloneResourceManager resourceManager; + + private UUID leaderSessionId; + @Before public void setup() throws Exception { rpcService = new TestingSerialRpcService(); + + taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); + TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); + resourceManager = createAndStartResourceManager(rmLeaderElectionService); + leaderSessionId = grantLeadership(rmLeaderElectionService); } @After @@ -59,19 +73,15 @@ public class ResourceManagerTaskExecutorTest { */ @Test public void testRegisterTaskExecutor() throws Exception { - String taskExecutorAddress = "/taskExecutor1"; - ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); - TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService); - final UUID leaderSessionId = grantLeadership(rmLeaderElectionService); - // test response successful - Future<RegistrationResponse> successfulFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); + Future<RegistrationResponse> successfulFuture = + resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport); RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); // test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor - Future<RegistrationResponse> duplicateFuture = resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID); + Future<RegistrationResponse> duplicateFuture = + resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport); RegistrationResponse duplicateResponse = duplicateFuture.get(); assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess); assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId()); @@ -82,15 +92,10 @@ public class ResourceManagerTaskExecutorTest { */ @Test public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception { - String taskExecutorAddress = "/taskExecutor1"; - ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); - TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final ResourceManager resourceManager = createAndStartResourceManager(rmLeaderElectionService); - final UUID leaderSessionId = grantLeadership(rmLeaderElectionService); - // test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId UUID differentLeaderSessionID = UUID.randomUUID(); - Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID); + Future<RegistrationResponse> unMatchedLeaderFuture = + resourceManager.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport); assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); } @@ -99,15 +104,10 @@ public class ResourceManagerTaskExecutorTest { */ @Test public void testRegisterTaskExecutorFromInvalidAddress() throws Exception { - String taskExecutorAddress = "/taskExecutor1"; - ResourceID taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress); - TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService(); - final ResourceManager resourceManager = createAndStartResourceManager(leaderElectionService); - final UUID leaderSessionId = grantLeadership(leaderElectionService); - // test throw exception when receive a registration from taskExecutor which takes invalid address String invalidAddress = "/taskExecutor2"; - Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID); + Future<RegistrationResponse> invalidAddressFuture = + resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport); assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline); } @@ -118,10 +118,11 @@ public class ResourceManagerTaskExecutorTest { return taskExecutorResourceID; } - private ResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) { + private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService) { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); - ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager()); + StandaloneResourceManager resourceManager = + new TestingResourceManager(rpcService, highAvailabilityServices); resourceManager.start(); return resourceManager; } http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java new file mode 100644 index 0000000..6b4ca14 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.RpcService; + +public class TestingResourceManager extends StandaloneResourceManager { + + public TestingResourceManager(RpcService rpcService) { + this(rpcService, new TestingHighAvailabilityServices()); + } + + public TestingResourceManager( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices) { + this(rpcService, highAvailabilityServices, new TestingSlotManagerFactory()); + } + + public TestingResourceManager( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + SlotManagerFactory slotManagerFactory) { + super(rpcService, highAvailabilityServices, slotManagerFactory); + } + + private static class TestingSlotManagerFactory implements SlotManagerFactory { + + @Override + public SlotManager create(ResourceManagerServices rmServices) { + return new TestingSlotManager(rmServices); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java new file mode 100644 index 0000000..0b2c42b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.ResourceSlot; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.mockito.Mockito; + +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.Executor; + +public class TestingSlotManager extends SlotManager { + + public TestingSlotManager() { + this(new TestingResourceManagerServices()); + } + + public TestingSlotManager(ResourceManagerServices rmServices) { + super(rmServices); + } + + @Override + protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) { + final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator(); + if (slotIterator.hasNext()) { + return slotIterator.next(); + } else { + return null; + } + } + + @Override + protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) { + final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator(); + if (requestIterator.hasNext()) { + return requestIterator.next(); + } else { + return null; + } + } + + private static class TestingResourceManagerServices implements ResourceManagerServices { + + @Override + public void allocateResource(ResourceProfile resourceProfile) { + + } + + @Override + public Executor getAsyncExecutor() { + return Mockito.mock(Executor.class); + } + + @Override + public Executor getMainThreadExecutor() { + return Mockito.mock(Executor.class); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 0fed79e..0d2b40d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -28,13 +28,16 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.SlotRequestReply; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,7 +48,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; public class SlotManagerTest { @@ -59,13 +61,15 @@ public class SlotManagerTest { private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); - private static TaskExecutorGateway taskExecutorGateway; + private static TaskExecutorRegistration taskExecutorRegistration; @BeforeClass public static void setUp() { - taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class); - Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class))) - .thenReturn(new FlinkCompletableFuture<SlotRequestReply>()); + taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class); + TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class); + Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway); + Mockito.when(gateway.requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); } /** @@ -180,9 +184,9 @@ public class SlotManagerTest { assertEquals(1, slotManager.getPendingRequestCount()); SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); + SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); + slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); assertEquals(1, slotManager.getAllocatedSlotCount()); assertEquals(0, slotManager.getFreeSlotCount()); @@ -198,9 +202,9 @@ public class SlotManagerTest { TestingSlotManager slotManager = new TestingSlotManager(); SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); + SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); + slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); assertEquals(0, slotManager.getAllocatedSlotCount()); assertEquals(1, slotManager.getFreeSlotCount()); @@ -216,9 +220,9 @@ public class SlotManagerTest { assertEquals(1, slotManager.getPendingRequestCount()); SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); + SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); + slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); assertEquals(0, slotManager.getAllocatedSlotCount()); assertEquals(1, slotManager.getFreeSlotCount()); @@ -234,9 +238,9 @@ public class SlotManagerTest { TestingSlotManager slotManager = new TestingSlotManager(); SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID()); - slotManager.updateSlotStatus(slotStatus); + SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); + slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); assertEquals(1, slotManager.getAllocatedSlotCount()); assertEquals(0, slotManager.getFreeSlotCount()); @@ -244,48 +248,44 @@ public class SlotManagerTest { } /** - * Tests that we had a slot in-use, and it's confirmed by SlotReport + * Tests that we had a slot in-use and is freed again subsequently. */ @Test public void testExistingInUseSlotUpdateStatus() { TestingSlotManager slotManager = new TestingSlotManager(); - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - // make this slot in use SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); + SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID()); + SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); + slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); assertEquals(1, slotManager.getAllocatedSlotCount()); assertEquals(0, slotManager.getFreeSlotCount()); assertTrue(slotManager.isAllocated(slotId)); - // slot status is confirmed - SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, - request.getJobId(), request.getAllocationId()); - slotManager.updateSlotStatus(slotStatus2); + // slot is freed again + slotManager.notifySlotAvailable(slotId.getResourceID(), slotId); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertTrue(slotManager.isAllocated(slotId)); + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertFalse(slotManager.isAllocated(slotId)); } /** - * Tests that we had a slot in-use, but it's empty according to the SlotReport + * Tests multiple slot requests with one slots. */ @Test - public void testExistingInUseSlotAdjustedToEmpty() { + public void testMultipleSlotRequestsWithOneSlot() { TestingSlotManager slotManager = new TestingSlotManager(); - SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); + final AllocationID allocationID = new AllocationID(); + + SlotRequest request1 = new SlotRequest(new JobID(), allocationID, DEFAULT_TESTING_PROFILE); slotManager.requestSlot(request1); - // make this slot in use - SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); + final ResourceID resourceID = ResourceID.generate(); + final SlotStatus slotStatus = new SlotStatus(new SlotID(resourceID, 0), DEFAULT_TESTING_PROFILE); + final SlotReport slotReport = new SlotReport(slotStatus); + slotManager.registerTaskExecutor(resourceID, taskExecutorRegistration, slotReport); // another request pending SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); @@ -294,66 +294,20 @@ public class SlotManagerTest { assertEquals(1, slotManager.getAllocatedSlotCount()); assertEquals(0, slotManager.getFreeSlotCount()); assertEquals(1, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(allocationID)); assertTrue(slotManager.isAllocated(request1.getAllocationId())); - - // but slot is reported empty again, request2 will be fulfilled, request1 will be missing - slotManager.updateSlotStatus(slotStatus); + // but slot is reported empty in a report in the meantime which shouldn't affect the state + slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID()); assertEquals(1, slotManager.getAllocatedSlotCount()); assertEquals(0, slotManager.getFreeSlotCount()); assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); + assertTrue(slotManager.isAllocated(slotStatus.getSlotID())); assertTrue(slotManager.isAllocated(request2.getAllocationId())); - } - - /** - * Tests that we had a slot in use, and it's also reported in use by TaskManager, but the allocation - * information didn't match. - */ - @Test - public void testExistingInUseSlotWithDifferentAllocationInfo() { - TestingSlotManager slotManager = new TestingSlotManager(); - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - - // make this slot in use - SlotID slotId = SlotID.generate(); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorGateway); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - assertTrue(slotManager.isAllocated(request.getAllocationId())); - - SlotStatus slotStatus2 = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID()); - // update slot status with different allocation info - slotManager.updateSlotStatus(slotStatus2); - - // original request is missing and won't be allocated - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); - assertFalse(slotManager.isAllocated(request.getAllocationId())); - assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID())); - } - - /** - * Tests that we had a free slot, and it's confirmed by SlotReport - */ - @Test - public void testExistingEmptySlotUpdateStatus() { - TestingSlotManager slotManager = new TestingSlotManager(); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway); - slotManager.addFreeSlot(slot); - SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE); - slotManager.updateSlotStatus(slotStatus); + // but slot is reported empty in a report in the meantime which shouldn't affect the state + slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID()); assertEquals(0, slotManager.getAllocatedSlotCount()); assertEquals(1, slotManager.getFreeSlotCount()); @@ -361,34 +315,12 @@ public class SlotManagerTest { } /** - * Tests that we had a free slot, and it's reported in-use by TaskManager - */ - @Test - public void testExistingEmptySlotAdjustedToInUse() { - TestingSlotManager slotManager = new TestingSlotManager(); - final SlotID slotID = SlotID.generate(); - slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway); - - ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway); - slotManager.addFreeSlot(slot); - - SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), DEFAULT_TESTING_PROFILE, - new JobID(), new AllocationID()); - slotManager.updateSlotStatus(slotStatus); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slot.getSlotId())); - } - - /** * Tests that we did some allocation but failed / rejected by TaskManager, request will retry */ @Test public void testSlotAllocationFailedAtTaskManager() { TestingSlotManager slotManager = new TestingSlotManager(); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorGateway); + ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); slotManager.addFreeSlot(slot); SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); @@ -409,24 +341,31 @@ public class SlotManagerTest { /** * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request + * This can only occur after reconnect of the TaskExecutor. */ @Test public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() { TestingSlotManager slotManager = new TestingSlotManager(); final SlotID slotID = SlotID.generate(); - slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorGateway); - - ResourceSlot slot = new ResourceSlot(slotID, DEFAULT_TESTING_PROFILE, taskExecutorGateway); - slotManager.addFreeSlot(slot); + SlotStatus slot = new SlotStatus(slotID, DEFAULT_TESTING_PROFILE); + SlotReport slotReport = new SlotReport(slot); + slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport); SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); slotManager.requestSlot(request); - // slot is set empty by heartbeat - SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), slot.getResourceProfile()); - slotManager.updateSlotStatus(slotStatus); + assertEquals(1, slotManager.getAllocatedSlotCount()); + assertEquals(0, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); + + // slot is set empty by a reconnect of the TaskExecutor + slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport); + + assertEquals(0, slotManager.getAllocatedSlotCount()); + assertEquals(1, slotManager.getFreeSlotCount()); + assertEquals(0, slotManager.getPendingRequestCount()); - // another request took this slot + // another request takes the slot SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); slotManager.requestSlot(request2); @@ -436,12 +375,12 @@ public class SlotManagerTest { assertFalse(slotManager.isAllocated(request.getAllocationId())); assertTrue(slotManager.isAllocated(request2.getAllocationId())); - // original request should be pended - slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); + // original request should be retried + slotManager.handleSlotRequestFailedAtTaskManager(request, slotID); assertEquals(1, slotManager.getAllocatedSlotCount()); assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); + assertEquals(0, slotManager.getPendingRequestCount()); assertFalse(slotManager.isAllocated(request.getAllocationId())); assertTrue(slotManager.isAllocated(request2.getAllocationId())); } @@ -453,10 +392,10 @@ public class SlotManagerTest { ResourceID resource1 = ResourceID.generate(); ResourceID resource2 = ResourceID.generate(); - ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway); - ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway); - ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorGateway); - ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorGateway); + ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); + ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); + ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); + ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); slotManager.addFreeSlot(slot11); slotManager.addFreeSlot(slot21); @@ -499,7 +438,7 @@ public class SlotManagerTest { final int freeSlotNum) { for (int i = 0; i < freeSlotNum; ++i) { - slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorGateway)); + slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorRegistration)); } } @@ -507,13 +446,13 @@ public class SlotManagerTest { // testing classes // ------------------------------------------------------------------------ - private static class TestingSlotManager extends SlotManager implements ResourceManagerServices { + private static class TestingSlotManager extends SlotManager { - private final List<ResourceProfile> allocatedContainers; + private static TestingRmServices testingRmServices = new TestingRmServices(); TestingSlotManager() { - this.allocatedContainers = new LinkedList<>(); - setupResourceManagerServices(this); + super(testingRmServices); + testingRmServices.allocatedContainers.clear(); } /** @@ -552,24 +491,34 @@ public class SlotManagerTest { return null; } - @Override - public void allocateResource(ResourceProfile resourceProfile) { - allocatedContainers.add(resourceProfile); + List<ResourceProfile> getAllocatedContainers() { + return testingRmServices.allocatedContainers; } - @Override - public Executor getAsyncExecutor() { - return Mockito.mock(Executor.class); - } - @Override - public Executor getExecutor() { - return Mockito.mock(Executor.class); - } + private static class TestingRmServices implements ResourceManagerServices { - List<ResourceProfile> getAllocatedContainers() { - return allocatedContainers; - } + private List<ResourceProfile> allocatedContainers; + + public TestingRmServices() { + this.allocatedContainers = new LinkedList<>(); + } + + @Override + public void allocateResource(ResourceProfile resourceProfile) { + allocatedContainers.add(resourceProfile); + } + @Override + public Executor getAsyncExecutor() { + return Mockito.mock(Executor.class); + } + + @Override + public Executor getMainThreadExecutor() { + return Mockito.mock(Executor.class); + } + + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index a87fe42..24d959e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -25,12 +25,20 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.resourcemanager.*; +import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.TestingResourceManager; +import org.apache.flink.runtime.resourcemanager.TestingSlotManager; +import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; @@ -94,9 +102,9 @@ public class SlotProtocolTest extends TestLogger { TestingLeaderElectionService rmLeaderElectionService = configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); - SlotManager slotManager = Mockito.spy(new SimpleSlotManager()); - ResourceManager resourceManager = - Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager)); + final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); + SpiedResourceManager resourceManager = + new SpiedResourceManager(testRpcService, testingHaServices, slotManagerFactory); resourceManager.start(); rmLeaderElectionService.isLeader(rmLeaderID); @@ -108,11 +116,13 @@ public class SlotProtocolTest extends TestLogger { Assert.fail("JobManager registration Future didn't become ready."); } + final SlotManager slotManager = slotManagerFactory.slotManager; + final AllocationID allocationID = new AllocationID(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100); SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); - SlotRequestReply slotRequestReply = + RMSlotRequestReply slotRequestReply = resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest); // 1) SlotRequest is routed to the SlotManager @@ -124,15 +134,18 @@ public class SlotProtocolTest extends TestLogger { allocationID); // 3) SlotRequest leads to a container allocation - verify(resourceManager, timeout(5000)).startNewWorker(resourceProfile); + Assert.assertEquals(1, resourceManager.startNewWorkerCalled); Assert.assertFalse(slotManager.isAllocated(allocationID)); // slot becomes available final String tmAddress = "/tm1"; TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class))) - .thenReturn(new FlinkCompletableFuture<SlotRequestReply>()); + Mockito + .when( + taskExecutorGateway + .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); final ResourceID resourceID = ResourceID.generate(); @@ -141,13 +154,14 @@ public class SlotProtocolTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotID, resourceProfile); final SlotReport slotReport = - new SlotReport(Collections.singletonList(slotStatus), resourceID); + new SlotReport(Collections.singletonList(slotStatus)); // register slot at SlotManager - slotManager.registerTaskExecutor(resourceID, taskExecutorGateway); - slotManager.updateSlotStatus(slotReport); + slotManager.registerTaskExecutor( + resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport); // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); + verify(taskExecutorGateway, timeout(5000)) + .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class)); } /** @@ -173,13 +187,15 @@ public class SlotProtocolTest extends TestLogger { configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), any(UUID.class), any(Time.class))) - .thenReturn(new FlinkCompletableFuture<SlotRequestReply>()); + Mockito.when( + taskExecutorGateway + .requestSlot(any(SlotID.class), any(AllocationID.class), any(UUID.class), any(Time.class))) + .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); testRpcService.registerGateway(tmAddress, taskExecutorGateway); - SlotManager slotManager = Mockito.spy(new SimpleSlotManager()); - ResourceManager resourceManager = - Mockito.spy(new StandaloneResourceManager(testRpcService, testingHaServices, slotManager)); + TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); + TestingResourceManager resourceManager = + Mockito.spy(new TestingResourceManager(testRpcService, testingHaServices, slotManagerFactory)); resourceManager.start(); rmLeaderElectionService.isLeader(rmLeaderID); @@ -191,6 +207,8 @@ public class SlotProtocolTest extends TestLogger { Assert.fail("JobManager registration Future didn't become ready."); } + final SlotManager slotManager = slotManagerFactory.slotManager; + final ResourceID resourceID = ResourceID.generate(); final AllocationID allocationID = new AllocationID(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100); @@ -199,13 +217,13 @@ public class SlotProtocolTest extends TestLogger { final SlotStatus slotStatus = new SlotStatus(slotID, resourceProfile); final SlotReport slotReport = - new SlotReport(Collections.singletonList(slotStatus), resourceID); + new SlotReport(Collections.singletonList(slotStatus)); // register slot at SlotManager - slotManager.registerTaskExecutor(resourceID, taskExecutorGateway); - slotManager.updateSlotStatus(slotReport); + slotManager.registerTaskExecutor( + resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport); SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); - SlotRequestReply slotRequestReply = + RMSlotRequestReply slotRequestReply = resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest); // 1) a SlotRequest is routed to the SlotManager @@ -220,9 +238,9 @@ public class SlotProtocolTest extends TestLogger { Assert.assertTrue(slotManager.isAllocated(slotID)); Assert.assertTrue(slotManager.isAllocated(allocationID)); - // 4) a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class)); + verify(taskExecutorGateway, timeout(5000)) + .requestSlot(eq(slotID), eq(allocationID), any(UUID.class), any(Time.class)); } private static TestingLeaderElectionService configureHA( @@ -240,4 +258,32 @@ public class SlotProtocolTest extends TestLogger { return rmLeaderElectionService; } + private static class SpiedResourceManager extends TestingResourceManager { + + private int startNewWorkerCalled = 0; + + public SpiedResourceManager( + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + SlotManagerFactory slotManagerFactory) { + super(rpcService, highAvailabilityServices, slotManagerFactory); + } + + + @Override + public void startNewWorker(ResourceProfile resourceProfile) { + startNewWorkerCalled++; + } + } + + private static class TestingSlotManagerFactory implements SlotManagerFactory { + + private SlotManager slotManager; + + @Override + public SlotManager create(ResourceManagerServices rmServices) { + this.slotManager = Mockito.spy(new TestingSlotManager(rmServices)); + return this.slotManager; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 9c1f288..7710fa9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.highavailability.NonHaServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -28,11 +30,15 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected; +import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.TestLogger; +import org.hamcrest.Matchers; import org.junit.Test; import org.powermock.api.mockito.PowerMockito; @@ -76,7 +82,7 @@ public class TaskExecutorTest extends TestLogger { String taskManagerAddress = taskManager.getAddress(); verify(rmGateway).registerTaskExecutor( - any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(Time.class)); + any(UUID.class), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); } finally { rpc.stopService(); @@ -132,7 +138,7 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address1, leaderId1); verify(rmGateway1).registerTaskExecutor( - eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(Time.class)); + eq(leaderId1), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); // cancel the leader @@ -142,11 +148,95 @@ public class TaskExecutorTest extends TestLogger { testLeaderService.notifyListener(address2, leaderId2); verify(rmGateway2).registerTaskExecutor( - eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(Time.class)); + eq(leaderId2), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); assertNotNull(taskManager.getResourceManagerConnection()); } finally { rpc.stopService(); } } + + /** + * Tests that all allocation requests for slots are ignored if the slot has been reported as + * free by the TaskExecutor but this report hasn't been confirmed by the ResourceManager. + * + * This is essential for the correctness of the state of the ResourceManager. + */ + @Test + public void testRejectAllocationRequestsForOutOfSyncSlots() { + final ResourceID resourceID = ResourceID.generate(); + + final String address1 = "/resource/manager/address/one"; + final UUID leaderId = UUID.randomUUID(); + + final TestingSerialRpcService rpc = new TestingSerialRpcService(); + try { + // register the mock resource manager gateways + ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class); + rpc.registerGateway(address1, rmGateway1); + + TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(); + + TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices(); + haServices.setResourceManagerLeaderRetriever(testLeaderService); + + TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class); + PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1); + + TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class); + when(taskManagerLocation.getResourceID()).thenReturn(resourceID); + + TaskExecutor taskManager = new TaskExecutor( + taskManagerServicesConfiguration, + taskManagerLocation, + rpc, + mock(MemoryManager.class), + mock(IOManager.class), + mock(NetworkEnvironment.class), + haServices, + mock(MetricRegistry.class), + mock(FatalErrorHandler.class)); + + taskManager.start(); + String taskManagerAddress = taskManager.getAddress(); + + // no connection initially, since there is no leader + assertNull(taskManager.getResourceManagerConnection()); + + // define a leader and see that a registration happens + testLeaderService.notifyListener(address1, leaderId); + + verify(rmGateway1).registerTaskExecutor( + eq(leaderId), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); + assertNotNull(taskManager.getResourceManagerConnection()); + + // test that allocating a slot works + final SlotID slotID = new SlotID(resourceID, 0); + TMSlotRequestReply tmSlotRequestReply = taskManager.requestSlot(slotID, new AllocationID(), leaderId); + assertTrue(tmSlotRequestReply instanceof TMSlotRequestRegistered); + + // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM + final SlotID unconfirmedFreeSlotID = new SlotID(resourceID, 1); + taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID); + TMSlotRequestReply tmSlotRequestReply2 = + taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId); + assertTrue(tmSlotRequestReply2 instanceof TMSlotRequestRejected); + + // re-register + verify(rmGateway1).registerTaskExecutor( + eq(leaderId), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class)); + testLeaderService.notifyListener(address1, leaderId); + + // now we should be successful because the slots status has been synced + // test that we can't allocate slots which are blacklisted due to pending confirmation of the RM + TMSlotRequestReply tmSlotRequestReply3 = + taskManager.requestSlot(unconfirmedFreeSlotID, new AllocationID(), leaderId); + assertTrue(tmSlotRequestReply3 instanceof TMSlotRequestRegistered); + + } + finally { + rpc.stopService(); + } + + } }