This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0c95396c05839447a75af6020896ed4733d1c5a7 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Sep 21 17:16:32 2018 +0200 [hotfix] Remove mocking from SlotManagerTest --- .../slotmanager/SlotManagerTest.java | 160 +++++++++------------ 1 file changed, 64 insertions(+), 96 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 854d27c..33a696a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -88,9 +88,7 @@ import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -107,9 +105,9 @@ public class SlotManagerTest extends TestLogger { @Test public void testTaskManagerRegistration() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -139,14 +137,12 @@ public class SlotManagerTest extends TestLogger { final ResourceActions resourceManagerActions = mock(ResourceActions.class); final JobID jobId = new JobID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + assertThat(tuple5.f4, is(equalTo(resourceManagerId))); + return new CompletableFuture<>(); + }) + .createTestingTaskExecutorGateway(); final ResourceID resourceId = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -232,8 +228,11 @@ public class SlotManagerTest extends TestLogger { resourceProfile, "localhost"); - ResourceActions resourceManagerActions = mock(ResourceActions.class); - doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class)); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceFunction(value -> { + throw new ResourceManagerException("Test exception"); + }) + .build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { @@ -264,19 +263,17 @@ public class SlotManagerTest extends TestLogger { resourceProfile, targetAddress); - ResourceActions resourceManagerActions = mock(ResourceActions.class); + ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - + final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -289,7 +286,7 @@ public class SlotManagerTest extends TestLogger { assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest)); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -309,14 +306,9 @@ public class SlotManagerTest extends TestLogger { final SlotID slotId = new SlotID(resourceID, 0); final AllocationID allocationId = new AllocationID(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(new CompletableFuture<>()); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> new CompletableFuture<>()) + .createTestingTaskExecutorGateway(); final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile); @@ -369,15 +361,14 @@ public class SlotManagerTest extends TestLogger { .setAllocateResourceConsumer(ignored -> numberAllocateResourceCalls.incrementAndGet()) .build(); + final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, String, ResourceManagerId>> requestFuture = new CompletableFuture<>(); // accept an incoming slot request - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - eq(slotId), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4)); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -394,7 +385,7 @@ public class SlotManagerTest extends TestLogger { taskExecutorConnection, slotReport); - verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class)); + assertThat(requestFuture.get(), is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, resourceManagerId)))); TaskManagerSlot slot = slotManager.getSlot(slotId); @@ -510,21 +501,17 @@ public class SlotManagerTest extends TestLogger { @Test public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); @@ -547,7 +534,7 @@ public class SlotManagerTest extends TestLogger { // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -557,21 +544,17 @@ public class SlotManagerTest extends TestLogger { @Test public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final AtomicInteger allocateResourceCalls = new AtomicInteger(0); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setAllocateResourceConsumer(resourceProfile -> allocateResourceCalls.incrementAndGet()) + .build(); final AllocationID allocationId = new AllocationID(); final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2); final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1); final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar"); final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - any(JobID.class), - any(AllocationID.class), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final ResourceID resourceID = ResourceID.generate(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); @@ -601,7 +584,7 @@ public class SlotManagerTest extends TestLogger { // check that we have only called the resource allocation only for the first slot request, // since the second request is a duplicate - verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class)); + assertThat(allocateResourceCalls.get(), is(0)); } /** @@ -637,7 +620,7 @@ public class SlotManagerTest extends TestLogger { @Test public void testUpdateSlotReport() throws Exception { final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder().build(); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); @@ -691,13 +674,16 @@ public class SlotManagerTest extends TestLogger { */ @Test public void testTaskManagerTimeout() throws Exception { - final long tmTimeout = 500L; + final long tmTimeout = 10L; - final ResourceActions resourceManagerActions = mock(ResourceActions.class); + final CompletableFuture<InstanceID> releaseFuture = new CompletableFuture<>(); + final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() + .setReleaseResourceConsumer((instanceID, e) -> releaseFuture.complete(instanceID)) + .build(); final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ResourceID resourceID = ResourceID.generate(); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceID, taskExecutorGateway); final SlotID slotId = new SlotID(resourceID, 0); @@ -715,15 +701,9 @@ public class SlotManagerTest extends TestLogger { slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions); - mainThreadExecutor.execute(new Runnable() { - @Override - public void run() { - slotManager.registerTaskManager(taskManagerConnection, slotReport); - } - }); + mainThreadExecutor.execute(() -> slotManager.registerTaskManager(taskManagerConnection, slotReport)); - verify(resourceManagerActions, timeout(100L * tmTimeout).times(1)) - .releaseResource(eq(taskManagerConnection.getInstanceID()), any(Exception.class)); + assertThat(releaseFuture.get(), is(equalTo(taskManagerConnection.getInstanceID()))); } } @@ -976,13 +956,12 @@ public class SlotManagerTest extends TestLogger { @Test public void testTimeoutForUnusedTaskManager() throws Exception { final long taskManagerTimeout = 50L; - final long verifyTimeout = taskManagerTimeout * 10L; - final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final CompletableFuture<InstanceID> releasedResourceFuture = new CompletableFuture<>(); final ResourceActions resourceManagerActions = new TestingResourceActionsBuilder() .setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)) .build(); + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor(); final ResourceID resourceId = ResourceID.generate(); @@ -992,14 +971,13 @@ public class SlotManagerTest extends TestLogger { final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1); final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar"); - final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class); - when(taskExecutorGateway.requestSlot( - any(SlotID.class), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + final CompletableFuture<SlotID> requestedSlotFuture = new CompletableFuture<>(); + final TaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setRequestSlotFunction(tuple5 -> { + requestedSlotFuture.complete(tuple5.f0); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(resourceId, taskExecutorGateway); @@ -1028,17 +1006,9 @@ public class SlotManagerTest extends TestLogger { } }, mainThreadExecutor) - .thenAccept((Object value) -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); - - ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class); + .thenRun(() -> slotManager.registerTaskManager(taskManagerConnection, initialSlotReport)); - verify(taskExecutorGateway, timeout(verifyTimeout)).requestSlot( - slotIdArgumentCaptor.capture(), - eq(jobId), - eq(allocationId), - anyString(), - eq(resourceManagerId), - any(Time.class)); + final SlotID slotId = requestedSlotFuture.get(); CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync( () -> slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()), @@ -1047,8 +1017,6 @@ public class SlotManagerTest extends TestLogger { // check that the TaskManager is not idle assertFalse(idleFuture.get()); - final SlotID slotId = slotIdArgumentCaptor.getValue(); - CompletableFuture<TaskManagerSlot> slotFuture = CompletableFuture.supplyAsync( () -> slotManager.getSlot(slotId), mainThreadExecutor);
