http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index bae7086..85b7eb4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -26,12 +26,14 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.TestingSerialRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -152,10 +154,15 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); - TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( Time.seconds(5L), Time.seconds(5L)); + + SlotManager slotManager = new SlotManager( + rpcService.getScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime()); MetricRegistry metricRegistry = mock(MetricRegistry.class); JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( @@ -171,7 +178,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { resourceManagerConfiguration, highAvailabilityServices, heartbeatServices, - slotManagerFactory, + slotManager, metricRegistry, jobLeaderIdService, fatalErrorHandler);
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java deleted file mode 100644 index 67b208d..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.runtime.clusterframework.types.AllocationID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; -import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.mockito.Mockito; - -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.Executor; - -public class TestingSlotManager extends SlotManager { - - public TestingSlotManager() { - this(new TestingResourceManagerServices()); - } - - public TestingSlotManager(ResourceManagerServices rmServices) { - super(rmServices); - } - - @Override - protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) { - final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator(); - if (slotIterator.hasNext()) { - return slotIterator.next(); - } else { - return null; - } - } - - @Override - protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) { - final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator(); - if (requestIterator.hasNext()) { - return requestIterator.next(); - } else { - return null; - } - } - - private static class TestingResourceManagerServices implements ResourceManagerServices { - - private final UUID leaderID = UUID.randomUUID(); - - @Override - public UUID getLeaderID() { - return leaderID; - } - - @Override - public void allocateResource(ResourceProfile resourceProfile) { - - } - - @Override - public Executor getAsyncExecutor() { - return Mockito.mock(Executor.class); - } - - @Override - public Executor getMainThreadExecutor() { - return Mockito.mock(Executor.class); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java deleted file mode 100644 index 6b5f6b2..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; -import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; - -public class TestingSlotManagerFactory implements SlotManagerFactory { - - @Override - public SlotManager create(ResourceManagerServices rmServices) { - return new TestingSlotManager(rmServices); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 041747d..b0b5d32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -23,510 +23,1058 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.clusterframework.types.ResourceSlot; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot; +import org.apache.flink.runtime.concurrent.CompletableFuture; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; -import org.junit.BeforeClass; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; +import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.util.TestLogger; import org.junit.Test; -import org.mockito.Mockito; +import org.mockito.ArgumentCaptor; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.Arrays; import java.util.UUID; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SlotManagerTest extends TestLogger { -public class SlotManagerTest { - - private static final double DEFAULT_TESTING_CPU_CORES = 1.0; + /** + * Tests that we can register task manager and their slots at the slot manager. + */ + @Test + public void testTaskManagerRegistration() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - private static final int DEFAULT_TESTING_MEMORY = 512; + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); - private static final ResourceProfile DEFAULT_TESTING_PROFILE = - new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY); + ResourceID resourceId = ResourceID.generate(); + final SlotID slotId1 = new SlotID(resourceId, 0); + final SlotID slotId2 = new SlotID(resourceId, 1); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); + final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); - private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = - new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2); + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + slotManager.registerTaskManager(taskManagerConnection, slotReport); - private static TaskExecutorRegistration taskExecutorRegistration; + assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots()); - @BeforeClass - public static void setUp() { - taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class); - TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class); - Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway); - Mockito.when(gateway.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) - .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); + assertNotNull(slotManager.getSlot(slotId1)); + assertNotNull(slotManager.getSlot(slotId2)); + } } /** - * Tests that there are no free slots when we request, need to allocate from cluster manager master + * Tests that un-registration of task managers will free and remove all registered slots. */ @Test - public void testRequestSlotWithoutFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0)); + public void testTaskManagerUnregistration() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final JobID jobId = new JobID(); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + any(JobID.class), + any(AllocationID.class), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>()); + + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + ResourceID resourceId = ResourceID.generate(); + final SlotID slotId1 = new SlotID(resourceId, 0); + final SlotID slotId2 = new SlotID(resourceId, 1); + final AllocationID allocationId1 = new AllocationID(); + final AllocationID allocationId2 = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile, jobId, allocationId1); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); + final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); + + final SlotRequest slotRequest = new SlotRequest( + new JobID(), + allocationId2, + resourceProfile, + "foobar"); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + slotManager.registerTaskManager(taskManagerConnection, slotReport); + + assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots()); + + TaskManagerSlot slot1 = slotManager.getSlot(slotId1); + TaskManagerSlot slot2 = slotManager.getSlot(slotId2); + + assertTrue(slot1.isAllocated()); + assertTrue(slot2.isFree()); + + assertTrue(slotManager.registerSlotRequest(slotRequest)); + + assertFalse(slot2.isFree()); + assertTrue(slot2.hasPendingSlotRequest()); + + PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId2); + + assertTrue("The pending slot request should have been assigned to slot 2", pendingSlotRequest.isAssigned()); + + slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID()); + + assertTrue(0 == slotManager.getNumberRegisteredSlots()); + assertFalse(pendingSlotRequest.isAssigned()); + } } /** - * Tests that there are some free slots when we request, and the request is fulfilled immediately + * Tests that a slot request with no free slots will trigger the resource allocation */ @Test - public void testRequestSlotWithFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(); + public void testSlotRequestWithoutFreeSlots() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotRequest slotRequest = new SlotRequest( + new JobID(), + new AllocationID(), + resourceProfile, + "localhost"); - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); - assertEquals(1, slotManager.getFreeSlotCount()); + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertEquals(0, slotManager.getAllocatedContainers().size()); + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + + slotManager.registerSlotRequest(slotRequest); + + verify(resourceManagerActions).allocateResource(eq(resourceProfile)); + } } /** - * Tests that there are some free slots when we request, but none of them are suitable + * Tests that the slot request fails if we cannot allocate more resources. */ @Test - public void testRequestSlotWithoutSuitableSlot() { - TestingSlotManager slotManager = new TestingSlotManager(); - - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2); - assertEquals(2, slotManager.getFreeSlotCount()); - - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(2, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + public void testSlotRequestWithResourceAllocationFailure() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotRequest slotRequest = new SlotRequest( + new JobID(), + new AllocationID(), + resourceProfile, + "localhost"); + + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class)); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + + slotManager.registerSlotRequest(slotRequest); + + fail("The slot request should have failed with a ResourceManagerException."); + + } catch (ResourceManagerException e) { + // expected exception + } } /** - * Tests that we send duplicated slot request + * Tests that a slot request which can be fulfilled will trigger a slot allocation. */ @Test - public void testDuplicatedSlotRequest() { - TestingSlotManager slotManager = new TestingSlotManager(); - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1); - - SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE); - - slotManager.requestSlot(request1); - slotManager.requestSlot(request2); - slotManager.requestSlot(request2); - slotManager.requestSlot(request1); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertEquals(1, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); + public void testSlotRequestWithFreeSlot() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceID resourceID = ResourceID.generate(); + final JobID jobId = new JobID(); + final SlotID slotId = new SlotID(resourceID, 0); + final String targetAddress = "localhost"; + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotRequest slotRequest = new SlotRequest( + jobId, + allocationId, + resourceProfile, + targetAddress); + + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + + // accept an incoming slot request + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + eq(slotId), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get())); + + final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway); + + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); + final SlotReport slotReport = new SlotReport(slotStatus); + + slotManager.registerTaskManager( + taskExecutorConnection, + slotReport); + + assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); + + verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class)); + + TaskManagerSlot slot = slotManager.getSlot(slotId); + + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); + } } /** - * Tests that we send multiple slot requests + * Checks that un-registering a pending slot request will cancel it, removing it from all + * assigned task manager slots and then remove it from the slot manager. */ @Test - public void testRequestMultipleSlots() { - TestingSlotManager slotManager = new TestingSlotManager(); - directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5); + public void testUnregisterPendingSlotRequest() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final SlotID slotId = new SlotID(ResourceID.generate(), 0); + final AllocationID allocationId = new AllocationID(); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + any(JobID.class), + any(AllocationID.class), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>()); + + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); + final SlotReport slotReport = new SlotReport(slotStatus); - // request 3 normal slots - for (int i = 0; i < 3; ++i) { - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - } + final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar"); - // request 2 big slots - for (int i = 0; i < 2; ++i) { - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); - } + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + slotManager.registerTaskManager(taskManagerConnection, slotReport); - // request 1 normal slot again - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + TaskManagerSlot slot = slotManager.getSlot(slotId); - assertEquals(4, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(2, slotManager.getPendingRequestCount()); - assertEquals(2, slotManager.getAllocatedContainers().size()); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0)); - assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1)); + slotManager.registerSlotRequest(slotRequest); + + assertNotNull(slotManager.getSlotRequest(allocationId)); + + assertTrue(slot.hasPendingSlotRequest()); + + slotManager.unregisterSlotRequest(allocationId); + + assertNull(slotManager.getSlotRequest(allocationId)); + + slot = slotManager.getSlot(slotId); + assertTrue(slot.isFree()); + } } /** - * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request + * Tests that pending slot requests are tried to be fulfilled upon new slot registrations. */ @Test - public void testNewlyAppearedFreeSlotFulfillPendingRequest() { - TestingSlotManager slotManager = new TestingSlotManager(); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - assertEquals(1, slotManager.getPendingRequestCount()); - - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotId)); + public void testFulfillingPendingSlotRequest() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceID resourceID = ResourceID.generate(); + final JobID jobId = new JobID(); + final SlotID slotId = new SlotID(resourceID, 0); + final String targetAddress = "localhost"; + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotRequest slotRequest = new SlotRequest( + jobId, + allocationId, + resourceProfile, + targetAddress); + + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + + // accept an incoming slot request + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + eq(slotId), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get())); + + final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway); + + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); + final SlotReport slotReport = new SlotReport(slotStatus); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + + assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); + + verify(resourceManagerActions, times(1)).allocateResource(eq(resourceProfile)); + + slotManager.registerTaskManager( + taskExecutorConnection, + slotReport); + + verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class)); + + TaskManagerSlot slot = slotManager.getSlot(slotId); + + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); + } } /** - * Tests that a new slot appeared in SlotReport, but we have no pending request + * Tests that freeing a slot will correctly reset the slot and mark it as a free slot */ @Test - public void testNewlyAppearedFreeSlot() { - TestingSlotManager slotManager = new TestingSlotManager(); + public void testFreeSlot() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceID resourceID = ResourceID.generate(); + final JobID jobId = new JobID(); + final SlotID slotId = new SlotID(resourceID, 0); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + + // accept an incoming slot request + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); + final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway); - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId); + final SlotReport slotReport = new SlotReport(slotStatus); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + + slotManager.registerTaskManager( + taskExecutorConnection, + slotReport); + + TaskManagerSlot slot = slotManager.getSlot(slotId); + + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); + + // this should be ignored since the allocation id does not match + slotManager.freeSlot(slotId, new AllocationID()); + + assertTrue(slot.isAllocated()); + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); + + slotManager.freeSlot(slotId, allocationId); + + assertTrue(slot.isFree()); + assertNull(slot.getAllocationId()); + } } /** - * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests + * Tests that a second pending slot request is detected as a duplicate if the allocation ids are + * the same. */ @Test - public void testNewlyAppearedFreeSlotNotMatchPendingRequests() { - TestingSlotManager slotManager = new TestingSlotManager(); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE)); - assertEquals(1, slotManager.getPendingRequestCount()); - - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE); - SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertFalse(slotManager.isAllocated(slotId)); + public void testDuplicatePendingSlotRequest() throws Exception { + + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); + final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); + final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); + final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + assertTrue(slotManager.registerSlotRequest(slotRequest1)); + assertFalse(slotManager.registerSlotRequest(slotRequest2)); + } + + // check that we have only called the resource allocation only for the first slot request, + // since the second request is a duplicate + verify(resourceManagerActions, times(1)).allocateResource(any(ResourceProfile.class)); } /** - * Tests that a new slot appeared in SlotReport, and it's been reported using by some job + * Tests that if we have received a slot report with some allocated slots, then we don't accept + * slot requests with allocated allocation ids. */ @Test - public void testNewlyAppearedInUseSlot() { - TestingSlotManager slotManager = new TestingSlotManager(); + public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotID slotId = new SlotID(ResourceID.generate(), 0); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId); + final SlotReport slotReport = new SlotReport(slotStatus); - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID()); - SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertTrue(slotManager.isAllocated(slotId)); + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + slotManager.registerTaskManager(taskManagerConnection, slotReport); + + assertFalse(slotManager.registerSlotRequest(slotRequest)); + } } /** - * Tests that we had a slot in-use and is freed again subsequently. + * Tests that duplicate slot requests (requests with an already registered allocation id) are + * also detected after a pending slot request has been fulfilled but not yet freed. */ @Test - public void testExistingInUseSlotUpdateStatus() { - TestingSlotManager slotManager = new TestingSlotManager(); + public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); + final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); + final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); + final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + any(JobID.class), + any(AllocationID.class), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get())); + + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + final SlotID slotId = new SlotID(ResourceID.generate(), 0); + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1); + final SlotReport slotReport = new SlotReport(slotStatus); - SlotID slotId = SlotID.generate(); - SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID()); - SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus)); - slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport); + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + slotManager.registerTaskManager(taskManagerConnection, slotReport); + assertTrue(slotManager.registerSlotRequest(slotRequest1)); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertTrue(slotManager.isAllocated(slotId)); + TaskManagerSlot slot = slotManager.getSlot(slotId); - // slot is freed again - slotManager.notifySlotAvailable(slotId.getResourceID(), slotId); + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertFalse(slotManager.isAllocated(slotId)); + assertFalse(slotManager.registerSlotRequest(slotRequest2)); + } + + // check that we have only called the resource allocation only for the first slot request, + // since the second request is a duplicate + verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); } /** - * Tests multiple slot requests with one slots. + * Tests that an already registered allocation id can be reused after the initial slot request + * has been freed. */ @Test - public void testMultipleSlotRequestsWithOneSlot() { - TestingSlotManager slotManager = new TestingSlotManager(); - final AllocationID allocationID = new AllocationID(); + public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); + final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); + final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); + final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + any(JobID.class), + any(AllocationID.class), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get())); + + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + final SlotID slotId = new SlotID(ResourceID.generate(), 0); + final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2)); + final SlotReport slotReport = new SlotReport(slotStatus); - SlotRequest request1 = new SlotRequest(new JobID(), allocationID, DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request1); + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + slotManager.registerTaskManager(taskManagerConnection, slotReport); + assertTrue(slotManager.registerSlotRequest(slotRequest1)); - final ResourceID resourceID = ResourceID.generate(); - final SlotStatus slotStatus = new SlotStatus(new SlotID(resourceID, 0), DEFAULT_TESTING_PROFILE); - final SlotReport slotReport = new SlotReport(slotStatus); - slotManager.registerTaskExecutor(resourceID, taskExecutorRegistration, slotReport); + TaskManagerSlot slot = slotManager.getSlot(slotId); - // another request pending - SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request2); + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(1, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(allocationID)); - assertTrue(slotManager.isAllocated(request1.getAllocationId())); + slotManager.freeSlot(slotId, allocationId); - // but slot is reported empty in a report in the meantime which shouldn't affect the state - slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID()); + // check that the slot has been freed + assertTrue(slot.isFree()); + assertNull(slot.getAllocationId()); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slotStatus.getSlotID())); - assertTrue(slotManager.isAllocated(request2.getAllocationId())); + assertTrue(slotManager.registerSlotRequest(slotRequest2)); - // but slot is reported empty in a report in the meantime which shouldn't affect the state - slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID()); + assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId()); + } - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); + // check that we have only called the resource allocation only for the first slot request, + // since the second request is a duplicate + verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); } /** - * Tests that we did some allocation but failed / rejected by TaskManager, request will retry + * Tests that the slot manager ignores slot reports of unknown origin (not registered + * task managers). */ @Test - public void testSlotAllocationFailedAtTaskManager() { - TestingSlotManager slotManager = new TestingSlotManager(); - ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); - slotManager.addFreeSlot(slot); + public void testReceivingUnknownSlotReport() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); + final InstanceID unknownInstanceID = new InstanceID(); + final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0); + final ResourceProfile unknownResourceProfile = new ResourceProfile(1.0, 1); + final SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile); + final SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertTrue(slotManager.isAllocated(slot.getSlotId())); + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + // check that we don't have any slots registered + assertTrue(0 == slotManager.getNumberRegisteredSlots()); - slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId()); + // this should not update anything since the instance id is not known to the slot manager + assertFalse(slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport)); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); + assertTrue(0 == slotManager.getNumberRegisteredSlots()); + } } - /** - * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request - * This can only occur after reconnect of the TaskExecutor. + * Tests that slots are updated with respect to the latest incoming slot report. This means that + * slot for which not report has been received will be removed and those for which a report was + * received are updated accordingly. */ @Test - public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() { - TestingSlotManager slotManager = new TestingSlotManager(); - final SlotID slotID = SlotID.generate(); - SlotStatus slot = new SlotStatus(slotID, DEFAULT_TESTING_PROFILE); - SlotReport slotReport = new SlotReport(slot); - slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport); - - SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - - // slot is set empty by a reconnect of the TaskExecutor - slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport); - - assertEquals(0, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - - // another request takes the slot - SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE); - slotManager.requestSlot(request2); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertFalse(slotManager.isAllocated(request.getAllocationId())); - assertTrue(slotManager.isAllocated(request2.getAllocationId())); - - // original request should be retried - slotManager.handleSlotRequestFailedAtTaskManager(request, slotID); - - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - assertFalse(slotManager.isAllocated(request.getAllocationId())); - assertTrue(slotManager.isAllocated(request2.getAllocationId())); + public void testUpdateSlotReport() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + + final ResourceID resourceId = ResourceID.generate(); + final SlotID slotId1 = new SlotID(resourceId, 0); + final SlotID slotId2 = new SlotID(resourceId, 1); + final SlotID slotId3 = new SlotID(resourceId, 2); + + + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); + + final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId); + final SlotStatus slotStatus3 = new SlotStatus(slotId3, resourceProfile); + + final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); + final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus3)); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + // check that we don't have any slots registered + assertTrue(0 == slotManager.getNumberRegisteredSlots()); + + slotManager.registerTaskManager(taskManagerConnection, slotReport1); + + TaskManagerSlot slot = slotManager.getSlot(slotId2); + + assertTrue(2 == slotManager.getNumberRegisteredSlots()); + + assertTrue(slot.isFree()); + + assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2)); + + assertTrue(2 == slotManager.getNumberRegisteredSlots()); + + // the slot manager should have removed slotId1 + assertNull(slotManager.getSlot(slotId1)); + + assertNotNull(slotManager.getSlot(slotId3)); + + // slotId2 should have been allocated for allocationId + assertEquals(allocationId, slotManager.getSlot(slotId2).getAllocationId()); + } } + /** + * Tests that idle task managers time out after the configured timeout. A timed out task manager + * will be removed from the slot manager and the resource manager will be notified about the + * timeout. + */ @Test - public void testNotifyTaskManagerFailure() { - TestingSlotManager slotManager = new TestingSlotManager(); + public void testTaskManagerTimeout() throws Exception { + final long tmTimeout = 50L; - ResourceID resource1 = ResourceID.generate(); - ResourceID resource2 = ResourceID.generate(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final UUID leaderId = UUID.randomUUID(); - ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); - ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); - ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); - ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration); + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); - slotManager.addFreeSlot(slot11); - slotManager.addFreeSlot(slot21); + final SlotID slotId = new SlotID(ResourceID.generate(), 0); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); + final SlotReport slotReport = new SlotReport(slotStatus); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); - slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE)); + final Executor mainThreadExecutor = mock(Executor.class); - assertEquals(2, slotManager.getAllocatedSlotCount()); - assertEquals(0, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); + try (SlotManager slotManager = new SlotManager( + TestingUtils.defaultScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + Time.milliseconds(tmTimeout))) { - slotManager.addFreeSlot(slot12); - slotManager.addFreeSlot(slot22); + slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions); - assertEquals(2, slotManager.getAllocatedSlotCount()); - assertEquals(2, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); + slotManager.registerTaskManager(taskManagerConnection, slotReport); - slotManager.notifyTaskManagerFailure(resource2); + ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); + verify(mainThreadExecutor, timeout(tmTimeout * 10L)).execute(runnableArgumentCaptor.capture()); - // notify an not exist resource failure - slotManager.notifyTaskManagerFailure(ResourceID.generate()); + // the only runnable being executed by the main thread executor should be the timeout runnable + Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); - assertEquals(1, slotManager.getAllocatedSlotCount()); - assertEquals(1, slotManager.getFreeSlotCount()); - assertEquals(0, slotManager.getPendingRequestCount()); - } + timeoutRunnable.run(); - // ------------------------------------------------------------------------ - // testing utilities - // ------------------------------------------------------------------------ - - private void directlyProvideFreeSlots( - final SlotManager slotManager, - final ResourceProfile resourceProfile, - final int freeSlotNum) - { - for (int i = 0; i < freeSlotNum; ++i) { - slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorRegistration)); + verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID())); } } - // ------------------------------------------------------------------------ - // testing classes - // ------------------------------------------------------------------------ + /** + * Tests that slot requests time out after the specified request timeout. If a slot request + * times out, then the request is cancelled, removed from the slot manager and the resourc + * manager is notified about the failed allocation. + */ + @Test + public void testSlotRequestTimeout() throws Exception { + final long allocationTimeout = 50L; + + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final UUID leaderId = UUID.randomUUID(); + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - private static class TestingSlotManager extends SlotManager { + final Executor mainThreadExecutor = mock(Executor.class); - private static TestingRmServices testingRmServices = new TestingRmServices(); + try (SlotManager slotManager = new SlotManager( + TestingUtils.defaultScheduledExecutor(), + TestingUtils.infiniteTime(), + Time.milliseconds(allocationTimeout), + TestingUtils.infiniteTime())) { - TestingSlotManager() { - super(testingRmServices); - testingRmServices.allocatedContainers.clear(); + slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions); + + assertTrue(slotManager.registerSlotRequest(slotRequest)); + + ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + + verify(mainThreadExecutor, timeout(allocationTimeout * 10L)).execute(runnableArgumentCaptor.capture()); + + // the only runnable being executed by the main thread executor should be the timeout runnable + Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + + timeoutRunnable.run(); + + verify(resourceManagerActions, times(1)).notifyAllocationFailure( + eq(jobId), + eq(allocationId), + any(TimeoutException.class)); } + } - /** - * Choose slot randomly if it matches requirement - * - * @param request The slot request - * @param freeSlots All slots which can be used - * @return The chosen slot or null if cannot find a match - */ - @Override - protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) { - for (ResourceSlot slot : freeSlots.values()) { - if (slot.isMatchingRequirement(request.getResourceProfile())) { - return slot; - } + /** + * Tests that a slot request is retried if it times out on the task manager side + */ + @Test + @SuppressWarnings("unchecked") + public void testTaskManagerSlotRequestTimeoutHandling() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); + final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>(); + final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = new FlinkCompletableFuture<>(); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + any(JobID.class), + eq(allocationId), + anyString(), + any(UUID.class), + any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2); + + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + final ResourceID resourceId = ResourceID.generate(); + final SlotID slotId1 = new SlotID(resourceId, 0); + final SlotID slotId2 = new SlotID(resourceId, 1); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); + final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); + + try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) { + + slotManager.registerTaskManager(taskManagerConnection, slotReport); + + slotManager.registerSlotRequest(slotRequest); + + ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class); + + verify(taskExecutorGateway, times(1)).requestSlot( + slotIdCaptor.capture(), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class)); + + TaskManagerSlot failedSlot = slotManager.getSlot(slotIdCaptor.getValue()); + + // let the first attempt fail --> this should trigger a second attempt + slotRequestFuture1.completeExceptionally(new SlotAllocationException("Test exception.")); + + verify(taskExecutorGateway, times(2)).requestSlot( + slotIdCaptor.capture(), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class)); + + // the second attempt succeeds + slotRequestFuture2.complete(Acknowledge.get()); + + TaskManagerSlot slot = slotManager.getSlot(slotIdCaptor.getValue()); + + assertTrue(slot.isAllocated()); + assertEquals(allocationId, slot.getAllocationId()); + + if (!failedSlot.getSlotId().equals(slot.getSlotId())) { + assertTrue(failedSlot.isFree()); } - return null; } + } - /** - * Choose request randomly if offered slot can match its requirement - * - * @param offeredSlot The free slot - * @param pendingRequests All the pending slot requests - * @return The chosen request's AllocationID or null if cannot find a match - */ - @Override - protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, - Map<AllocationID, SlotRequest> pendingRequests) - { - for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) { - if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) { - return pendingRequest.getValue(); - } - } - return null; + /** + * Tests that pending slot requests are rejected if a slot report with a different allocation + * is received. + */ + @Test + @SuppressWarnings("unchecked") + public void testSlotReportWhileActiveSlotRequest() throws Exception { + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); + final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>(); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + any(JobID.class), + eq(allocationId), + anyString(), + any(UUID.class), + any(Time.class))).thenReturn(slotRequestFuture1, FlinkCompletableFuture.completed(Acknowledge.get())); + + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + final ResourceID resourceId = ResourceID.generate(); + final SlotID slotId1 = new SlotID(resourceId, 0); + final SlotID slotId2 = new SlotID(resourceId, 1); + final SlotID slotId3 = new SlotID(resourceId, 2); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); + final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); + + // we have to manually trigger the future call backs to simulate the main thread executor behaviour + final Executor mainThreadExecutorMock = mock(Executor.class); + + try (SlotManager slotManager = new SlotManager( + TestingUtils.defaultScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime())) { + + slotManager.start(leaderId, mainThreadExecutorMock, resourceManagerActions); + + slotManager.registerTaskManager(taskManagerConnection, slotReport); + + slotManager.registerSlotRequest(slotRequest); + + ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class); + + verify(taskExecutorGateway, times(1)).requestSlot( + slotIdCaptor.capture(), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class)); + + final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID()); + final SlotStatus newSlotStatus2 = new SlotStatus(slotId3, resourceProfile); + final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2)); + + // this should remove the unused slot, replacing it with slotId3 and retry the pending slot request + slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport); + + ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture()); + + Runnable requestFailureRunnable = runnableArgumentCaptor.getValue(); + + requestFailureRunnable.run(); + + verify(taskExecutorGateway, times(2)).requestSlot( + slotIdCaptor.capture(), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class)); + + verify(mainThreadExecutorMock, times(2)).execute(runnableArgumentCaptor.capture()); + Runnable requestSuccessRunnable = runnableArgumentCaptor.getValue(); + + requestSuccessRunnable.run(); + + final SlotID requestedSlotId = slotIdCaptor.getValue(); + + assertEquals(slotId3, requestedSlotId); + + TaskManagerSlot slot = slotManager.getSlot(requestedSlotId); + + assertTrue(slot.isAllocated()); + assertEquals(allocationId, slot.getAllocationId()); } + } + + /** + * Tests that formerly used task managers can again timeout after all of their slots have + * been freed. + */ + @Test + public void testTimeoutForUnusedTaskManager() throws Exception { + final long taskManagerTimeout = 123456L; + + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); + final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); + + final ResourceID resourceId = ResourceID.generate(); + + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); + + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + any(SlotID.class), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get())); + + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); + + final SlotID slotId1 = new SlotID(resourceId, 0); + final SlotID slotId2 = new SlotID(resourceId, 1); + final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile); + final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile); + final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2)); + + try (SlotManager slotManager = new SlotManager( + scheduledExecutor, + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) { + + slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions); + + slotManager.registerSlotRequest(slotRequest); + + slotManager.registerTaskManager(taskManagerConnection, initialSlotReport); + + ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class); + + verify(taskExecutorGateway).requestSlot( + slotIdArgumentCaptor.capture(), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class)); + + assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID())); + + SlotID slotId = slotIdArgumentCaptor.getValue(); + TaskManagerSlot slot = slotManager.getSlot(slotId); + + assertTrue(slot.isAllocated()); + assertEquals(allocationId, slot.getAllocationId()); + + slotManager.freeSlot(slotId, allocationId); + + assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID())); + + ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + + // filter out the schedule call for the task manager which will be registered using the + // taskManagerTimeout value + verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS)); + + Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); - List<ResourceProfile> getAllocatedContainers() { - return testingRmServices.allocatedContainers; + timeoutRunnable.run(); + + verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID())); } + } + /** + * Tests that the slot manager re-registers a timeout for a rejected slot request. + */ + @Test + public void testTimeoutForRejectedSlotRequest() throws Exception { - private static class TestingRmServices implements ResourceManagerServices { + final long slotRequestTimeout = 1337L; + final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class); - private final UUID leaderID; + final ResourceID resourceId = ResourceID.generate(); + final SlotID slotId = new SlotID(resourceId, 0); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); + final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); + final SlotReport slotReport = new SlotReport(slotStatus); - private final List<ResourceProfile> allocatedContainers; + final UUID leaderId = UUID.randomUUID(); + final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - public TestingRmServices() { - this.leaderID = UUID.randomUUID(); - this.allocatedContainers = new LinkedList<>(); - } + final JobID jobId = new JobID(); + final AllocationID allocationId = new AllocationID(); + final AllocationID allocationId2 = new AllocationID(); + final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - @Override - public UUID getLeaderID() { - return leaderID; - } + CompletableFuture<Acknowledge> requestFuture = new FlinkCompletableFuture<>(); - @Override - public void allocateResource(ResourceProfile resourceProfile) { - allocatedContainers.add(resourceProfile); - } + final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + when(taskExecutorGateway.requestSlot( + eq(slotId), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class))).thenReturn(requestFuture); - @Override - public Executor getAsyncExecutor() { - return Mockito.mock(Executor.class); - } + final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); - @Override - public Executor getMainThreadExecutor() { - return Mockito.mock(Executor.class); - } + try (SlotManager slotManager = new SlotManager( + scheduledExecutor, + TestingUtils.infiniteTime(), + Time.milliseconds(slotRequestTimeout), + TestingUtils.infiniteTime())) { + + slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions); + + slotManager.registerTaskManager(taskManagerConnection, slotReport); + slotManager.registerSlotRequest(slotRequest); + + verify(taskExecutorGateway).requestSlot( + eq(slotId), + eq(jobId), + eq(allocationId), + anyString(), + eq(leaderId), + any(Time.class)); + + requestFuture.completeExceptionally(new SlotOccupiedException("Slot is already occupied", allocationId2)); + + ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS)); + + Runnable timeoutRunnable = runnableArgumentCaptor.getValue(); + + timeoutRunnable.run(); + + verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), eq(allocationId), any(Exception.class)); + + TaskManagerSlot slot = slotManager.getSlot(slotId); + + assertTrue(slot.isAllocated()); + assertEquals(allocationId2, slot.getAllocationId()); } } + + private SlotManager createSlotManager(UUID leaderId, ResourceManagerActions resourceManagerActions) { + SlotManager slotManager = new SlotManager( + TestingUtils.defaultScheduledExecutor(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime()); + + slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions); + + return slotManager; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java index 37690b5..a72969e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java @@ -19,47 +19,29 @@ package org.apache.flink.runtime.resourcemanager.slotmanager; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.clusterframework.FlinkResourceManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; -import org.apache.flink.runtime.concurrent.Future; -import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; -import org.apache.flink.runtime.heartbeat.HeartbeatServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; -import org.apache.flink.runtime.jobmaster.JobMasterGateway; -import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; -import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; -import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.registration.RegistrationResponse; -import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; -import org.apache.flink.runtime.resourcemanager.ResourceManager; -import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; -import org.apache.flink.runtime.resourcemanager.ResourceManagerServices; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.impl.FlinkFuture; import org.apache.flink.runtime.resourcemanager.SlotRequest; -import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; -import org.apache.flink.runtime.resourcemanager.TestingSlotManager; -import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply; -import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration; -import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.TestingSerialRpcService; +import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; @@ -70,128 +52,75 @@ import static org.mockito.Mockito.verify; public class SlotProtocolTest extends TestLogger { - private static TestingSerialRpcService testRpcService; + private static final long timeout = 10000L; + + + private static ScheduledExecutorService scheduledExecutorService; @BeforeClass public static void beforeClass() { - testRpcService = new TestingSerialRpcService(); + scheduledExecutorService = new ScheduledThreadPoolExecutor(1); } @AfterClass public static void afterClass() { - testRpcService.stopService(); - testRpcService = null; - } - - @Before - public void beforeTest(){ - testRpcService.clearGateways(); + Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService); } /** * Tests whether - * 1) SlotRequest is routed to the SlotManager - * 2) SlotRequest is confirmed - * 3) SlotRequest leads to a container allocation - * 4) Slot becomes available and TaskExecutor gets a SlotRequest + * 1) SlotManager accepts a slot request + * 2) SlotRequest leads to a container allocation + * 3) Slot becomes available and TaskExecutor gets a SlotRequest */ @Test public void testSlotsUnavailableRequest() throws Exception { - final String rmAddress = "/rm1"; - final String jmAddress = "/jm1"; final JobID jobID = new JobID(); - final ResourceID rmResourceId = new ResourceID(rmAddress); final ResourceID jmResourceId = new ResourceID(jmAddress); - testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); - - final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices(); final UUID rmLeaderID = UUID.randomUUID(); - final UUID jmLeaderID = UUID.randomUUID(); - TestingLeaderElectionService rmLeaderElectionService = - configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); - - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); - - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( - testingHaServices, - testRpcService.getScheduledExecutor(), - Time.seconds(5L)); - - final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); - - final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); - - SpiedResourceManager resourceManager = - new SpiedResourceManager( - rmResourceId, - testRpcService, - resourceManagerConfiguration, - testingHaServices, - heartbeatServices, - slotManagerFactory, - mock(MetricRegistry.class), - jobLeaderIdService, - mock(FatalErrorHandler.class)); - resourceManager.start(); - rmLeaderElectionService.isLeader(rmLeaderID); - - Future<RegistrationResponse> registrationFuture = - resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID); - try { - registrationFuture.get(5, TimeUnit.SECONDS); - } catch (Exception e) { - Assert.fail("JobManager registration Future didn't become ready."); - } - final SlotManager slotManager = slotManagerFactory.slotManager; + try (SlotManager slotManager = new SlotManager( + new ScheduledExecutorServiceAdapter(scheduledExecutorService), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime())) { - final AllocationID allocationID = new AllocationID(); - final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100); + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); - RMSlotRequestReply slotRequestReply = - resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest); + slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); - // 1) SlotRequest is routed to the SlotManager - verify(slotManager).requestSlot(slotRequest); + final AllocationID allocationID = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100); + final String targetAddress = "foobar"; - // 2) SlotRequest is confirmed - Assert.assertEquals( - slotRequestReply.getAllocationID(), - allocationID); + SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, targetAddress); - // 3) SlotRequest leads to a container allocation - Assert.assertEquals(1, resourceManager.startNewWorkerCalled); + slotManager.registerSlotRequest(slotRequest); - Assert.assertFalse(slotManager.isAllocated(allocationID)); + verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile())); - // slot becomes available - final String tmAddress = "/tm1"; - TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - Mockito - .when( + // slot becomes available + TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + Mockito.when( taskExecutorGateway .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) - .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); - testRpcService.registerGateway(tmAddress, taskExecutorGateway); - - final ResourceID resourceID = ResourceID.generate(); - final SlotID slotID = new SlotID(resourceID, 0); - - final SlotStatus slotStatus = - new SlotStatus(slotID, resourceProfile); - final SlotReport slotReport = - new SlotReport(Collections.singletonList(slotStatus)); - // register slot at SlotManager - slotManager.registerTaskExecutor( - resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport); - - // 4) Slot becomes available and TaskExecutor gets a SlotRequest - verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); + .thenReturn(mock(FlinkFuture.class)); + + final ResourceID resourceID = ResourceID.generate(); + final SlotID slotID = new SlotID(resourceID, 0); + + final SlotStatus slotStatus = + new SlotStatus(slotID, resourceProfile); + final SlotReport slotReport = + new SlotReport(Collections.singletonList(slotStatus)); + // register slot at SlotManager + slotManager.registerTaskManager(new TaskExecutorConnection(taskExecutorGateway), slotReport); + + // 4) Slot becomes available and TaskExecutor gets a SlotRequest + verify(taskExecutorGateway, timeout(5000L)) + .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); + } } /** @@ -203,159 +132,48 @@ public class SlotProtocolTest extends TestLogger { */ @Test public void testSlotAvailableRequest() throws Exception { - final String rmAddress = "/rm1"; - final String jmAddress = "/jm1"; - final String tmAddress = "/tm1"; final JobID jobID = new JobID(); - final ResourceID rmResourceId = new ResourceID(rmAddress); final ResourceID jmResourceId = new ResourceID(jmAddress); - testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); - - final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices(); final UUID rmLeaderID = UUID.randomUUID(); - final UUID jmLeaderID = UUID.randomUUID(); - TestingLeaderElectionService rmLeaderElectionService = - configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID); TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); Mockito.when( taskExecutorGateway .requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class))) - .thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>()); - testRpcService.registerGateway(tmAddress, taskExecutorGateway); - - ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( - Time.seconds(5L), - Time.seconds(5L)); - - JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( - testingHaServices, - testRpcService.getScheduledExecutor(), - Time.seconds(5L)); - - TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory(); - - HeartbeatServices heartbeatServices = mock(HeartbeatServices.class); - - ResourceManager<ResourceID> resourceManager = - Mockito.spy(new StandaloneResourceManager( - testRpcService, - FlinkResourceManager.RESOURCE_MANAGER_NAME, - rmResourceId, - resourceManagerConfiguration, - testingHaServices, - heartbeatServices, - slotManagerFactory, - mock(MetricRegistry.class), - jobLeaderIdService, - mock(FatalErrorHandler.class))); - resourceManager.start(); - rmLeaderElectionService.isLeader(rmLeaderID); - - Thread.sleep(1000); - - Future<RegistrationResponse> registrationFuture = - resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID); - try { - registrationFuture.get(5L, TimeUnit.SECONDS); - } catch (Exception e) { - Assert.fail("JobManager registration Future didn't become ready."); - } - - final SlotManager slotManager = slotManagerFactory.slotManager; + .thenReturn(mock(FlinkFuture.class)); - final ResourceID resourceID = ResourceID.generate(); - final AllocationID allocationID = new AllocationID(); - final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100); - final SlotID slotID = new SlotID(resourceID, 0); + try (SlotManager slotManager = new SlotManager( + new ScheduledExecutorServiceAdapter(scheduledExecutorService), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), + TestingUtils.infiniteTime())) { - final SlotStatus slotStatus = - new SlotStatus(slotID, resourceProfile); - final SlotReport slotReport = - new SlotReport(Collections.singletonList(slotStatus)); - // register slot at SlotManager - slotManager.registerTaskExecutor( - resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport); + ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class); - SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile); - RMSlotRequestReply slotRequestReply = - resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest); + slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions); - // 1) a SlotRequest is routed to the SlotManager - verify(slotManager).requestSlot(slotRequest); + final ResourceID resourceID = ResourceID.generate(); + final AllocationID allocationID = new AllocationID(); + final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100); + final SlotID slotID = new SlotID(resourceID, 0); - // 2) a SlotRequest is confirmed - Assert.assertEquals( - slotRequestReply.getAllocationID(), - allocationID); - - // 3) a SlotRequest leads to an allocation of a registered slot - Assert.assertTrue(slotManager.isAllocated(slotID)); - Assert.assertTrue(slotManager.isAllocated(allocationID)); - - // 4) a SlotRequest is routed to the TaskExecutor - verify(taskExecutorGateway, timeout(5000)) - .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); - } - - private static TestingLeaderElectionService configureHA( - TestingHighAvailabilityServices testingHA, JobID jobID, String rmAddress, UUID rmID, String jmAddress, UUID jmID) { - final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService); - final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(rmAddress, rmID); - testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); - - final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService(); - testingHA.setJobMasterLeaderElectionService(jobID, jmLeaderElectionService); - final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID); - testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService); - - return rmLeaderElectionService; - } - - private static class SpiedResourceManager extends StandaloneResourceManager { - - private int startNewWorkerCalled = 0; - - public SpiedResourceManager( - ResourceID resourceId, - RpcService rpcService, - ResourceManagerConfiguration resourceManagerConfiguration, - HighAvailabilityServices highAvailabilityServices, - HeartbeatServices heartbeatServices, - SlotManagerFactory slotManagerFactory, - MetricRegistry metricRegistry, - JobLeaderIdService jobLeaderIdService, - FatalErrorHandler fatalErrorHandler) { - super( - rpcService, - FlinkResourceManager.RESOURCE_MANAGER_NAME, - resourceId, - resourceManagerConfiguration, - highAvailabilityServices, - heartbeatServices, - slotManagerFactory, - metricRegistry, - jobLeaderIdService, - fatalErrorHandler); - } - - - @Override - public void startNewWorker(ResourceProfile resourceProfile) { - startNewWorkerCalled++; - } - } + final SlotStatus slotStatus = + new SlotStatus(slotID, resourceProfile); + final SlotReport slotReport = + new SlotReport(Collections.singletonList(slotStatus)); + // register slot at SlotManager + slotManager.registerTaskManager( + new TaskExecutorConnection(taskExecutorGateway), slotReport); - private static class TestingSlotManagerFactory implements SlotManagerFactory { + final String targetAddress = "foobar"; - private SlotManager slotManager; + SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, targetAddress); + slotManager.registerSlotRequest(slotRequest); - @Override - public SlotManager create(ResourceManagerServices rmServices) { - this.slotManager = Mockito.spy(new TestingSlotManager(rmServices)); - return this.slotManager; + // a SlotRequest is routed to the TaskExecutor + verify(taskExecutorGateway, timeout(5000)) + .requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class)); } } }