This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c95396c05839447a75af6020896ed4733d1c5a7
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Sep 21 17:16:32 2018 +0200

    [hotfix] Remove mocking from SlotManagerTest
---
 .../slotmanager/SlotManagerTest.java               | 160 +++++++++------------
 1 file changed, 64 insertions(+), 96 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 854d27c..33a696a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -88,9 +88,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -107,9 +105,9 @@ public class SlotManagerTest extends TestLogger {
        @Test
        public void testTaskManagerRegistration() throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                final ResourceID resourceId = ResourceID.generate();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -139,14 +137,12 @@ public class SlotManagerTest extends TestLogger {
                final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
                final JobID jobId = new JobID();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       any(Time.class))).thenReturn(new CompletableFuture<>());
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               assertThat(tuple5.f4, 
is(equalTo(resourceManagerId)));
+                               return new CompletableFuture<>();
+                       })
+                       .createTestingTaskExecutorGateway();
 
                final ResourceID resourceId = ResourceID.generate();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceId, taskExecutorGateway);
@@ -232,8 +228,11 @@ public class SlotManagerTest extends TestLogger {
                        resourceProfile,
                        "localhost");
 
-               ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
-               doThrow(new ResourceManagerException("Test 
exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceFunction(value -> {
+                               throw new ResourceManagerException("Test 
exception");
+                       })
+                       .build();
 
                try (SlotManager slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions)) {
 
@@ -264,19 +263,17 @@ public class SlotManagerTest extends TestLogger {
                        resourceProfile,
                        targetAddress);
 
-               ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                try (SlotManager slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions)) {
-
+                       final CompletableFuture<Tuple5<SlotID, JobID, 
AllocationID, String, ResourceManagerId>> requestFuture = new 
CompletableFuture<>();
                        // accept an incoming slot request
-                       final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-                       when(taskExecutorGateway.requestSlot(
-                               eq(slotId),
-                               eq(jobId),
-                               eq(allocationId),
-                               anyString(),
-                               eq(resourceManagerId),
-                               
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+                       final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                               .setRequestSlotFunction(tuple5 -> {
+                                       
requestFuture.complete(Tuple5.of(tuple5.f0, tuple5.f1, tuple5.f2, tuple5.f3, 
tuple5.f4));
+                                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+                               })
+                               .createTestingTaskExecutorGateway();
 
                        final TaskExecutorConnection taskExecutorConnection = 
new TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -289,7 +286,7 @@ public class SlotManagerTest extends TestLogger {
 
                        assertTrue("The slot request should be accepted", 
slotManager.registerSlotRequest(slotRequest));
 
-                       verify(taskExecutorGateway).requestSlot(eq(slotId), 
eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, 
resourceManagerId))));
 
                        TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -309,14 +306,9 @@ public class SlotManagerTest extends TestLogger {
                final SlotID slotId = new SlotID(resourceID, 0);
                final AllocationID allocationId = new AllocationID();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       any(Time.class))).thenReturn(new CompletableFuture<>());
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       
.setRequestSlotFunction(slotIDJobIDAllocationIDStringResourceManagerIdTuple5 -> 
new CompletableFuture<>())
+                       .createTestingTaskExecutorGateway();
 
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
                final SlotStatus slotStatus = new SlotStatus(slotId, 
resourceProfile);
@@ -369,15 +361,14 @@ public class SlotManagerTest extends TestLogger {
                        .setAllocateResourceConsumer(ignored -> 
numberAllocateResourceCalls.incrementAndGet())
                        .build();
 
+               final CompletableFuture<Tuple5<SlotID, JobID, AllocationID, 
String, ResourceManagerId>> requestFuture = new CompletableFuture<>();
                // accept an incoming slot request
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       eq(slotId),
-                       eq(jobId),
-                       eq(allocationId),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               requestFuture.complete(Tuple5.of(tuple5.f0, 
tuple5.f1, tuple5.f2, tuple5.f3, tuple5.f4));
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .createTestingTaskExecutorGateway();
 
                final TaskExecutorConnection taskExecutorConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
 
@@ -394,7 +385,7 @@ public class SlotManagerTest extends TestLogger {
                                taskExecutorConnection,
                                slotReport);
 
-                       verify(taskExecutorGateway).requestSlot(eq(slotId), 
eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), 
any(Time.class));
+                       assertThat(requestFuture.get(), 
is(equalTo(Tuple5.of(slotId, jobId, allocationId, targetAddress, 
resourceManagerId))));
 
                        TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -510,21 +501,17 @@ public class SlotManagerTest extends TestLogger {
        @Test
        public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() 
throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final AtomicInteger allocateResourceCalls = new 
AtomicInteger(0);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceConsumer(resourceProfile -> 
allocateResourceCalls.incrementAndGet())
+                       .build();
                final AllocationID allocationId = new AllocationID();
                final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
                final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
                final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
                final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
                final ResourceID resourceID = ResourceID.generate();
 
@@ -547,7 +534,7 @@ public class SlotManagerTest extends TestLogger {
 
                // check that we have only called the resource allocation only 
for the first slot request,
                // since the second request is a duplicate
-               verify(resourceManagerActions, 
never()).allocateResource(any(ResourceProfile.class));
+               assertThat(allocateResourceCalls.get(), is(0));
        }
 
        /**
@@ -557,21 +544,17 @@ public class SlotManagerTest extends TestLogger {
        @Test
        public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() 
throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final AtomicInteger allocateResourceCalls = new 
AtomicInteger(0);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setAllocateResourceConsumer(resourceProfile -> 
allocateResourceCalls.incrementAndGet())
+                       .build();
                final AllocationID allocationId = new AllocationID();
                final ResourceProfile resourceProfile1 = new 
ResourceProfile(1.0, 2);
                final ResourceProfile resourceProfile2 = new 
ResourceProfile(2.0, 1);
                final SlotRequest slotRequest1 = new SlotRequest(new JobID(), 
allocationId, resourceProfile1, "foobar");
                final SlotRequest slotRequest2 = new SlotRequest(new JobID(), 
allocationId, resourceProfile2, "barfoo");
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       any(JobID.class),
-                       any(AllocationID.class),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
 
                final ResourceID resourceID = ResourceID.generate();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
@@ -601,7 +584,7 @@ public class SlotManagerTest extends TestLogger {
 
                // check that we have only called the resource allocation only 
for the first slot request,
                // since the second request is a duplicate
-               verify(resourceManagerActions, 
never()).allocateResource(any(ResourceProfile.class));
+               assertThat(allocateResourceCalls.get(), is(0));
        }
 
        /**
@@ -637,7 +620,7 @@ public class SlotManagerTest extends TestLogger {
        @Test
        public void testUpdateSlotReport() throws Exception {
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder().build();
 
                final JobID jobId = new JobID();
                final AllocationID allocationId = new AllocationID();
@@ -691,13 +674,16 @@ public class SlotManagerTest extends TestLogger {
         */
        @Test
        public void testTaskManagerTimeout() throws Exception {
-               final long tmTimeout = 500L;
+               final long tmTimeout = 10L;
 
-               final ResourceActions resourceManagerActions = 
mock(ResourceActions.class);
+               final CompletableFuture<InstanceID> releaseFuture = new 
CompletableFuture<>();
+               final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
+                       .setReleaseResourceConsumer((instanceID, e) -> 
releaseFuture.complete(instanceID))
+                       .build();
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final ResourceID resourceID = ResourceID.generate();
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceID, taskExecutorGateway);
 
                final SlotID slotId = new SlotID(resourceID, 0);
@@ -715,15 +701,9 @@ public class SlotManagerTest extends TestLogger {
 
                        slotManager.start(resourceManagerId, 
mainThreadExecutor, resourceManagerActions);
 
-                       mainThreadExecutor.execute(new Runnable() {
-                               @Override
-                               public void run() {
-                                       
slotManager.registerTaskManager(taskManagerConnection, slotReport);
-                               }
-                       });
+                       mainThreadExecutor.execute(() -> 
slotManager.registerTaskManager(taskManagerConnection, slotReport));
 
-                       verify(resourceManagerActions, timeout(100L * 
tmTimeout).times(1))
-                               
.releaseResource(eq(taskManagerConnection.getInstanceID()), 
any(Exception.class));
+                       assertThat(releaseFuture.get(), 
is(equalTo(taskManagerConnection.getInstanceID())));
                }
        }
 
@@ -976,13 +956,12 @@ public class SlotManagerTest extends TestLogger {
        @Test
        public void testTimeoutForUnusedTaskManager() throws Exception {
                final long taskManagerTimeout = 50L;
-               final long verifyTimeout = taskManagerTimeout * 10L;
 
-               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final CompletableFuture<InstanceID> releasedResourceFuture = 
new CompletableFuture<>();
                final ResourceActions resourceManagerActions = new 
TestingResourceActionsBuilder()
                        .setReleaseResourceConsumer((instanceID, e) -> 
releasedResourceFuture.complete(instanceID))
                        .build();
+               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final ScheduledExecutor scheduledExecutor = 
TestingUtils.defaultScheduledExecutor();
 
                final ResourceID resourceId = ResourceID.generate();
@@ -992,14 +971,13 @@ public class SlotManagerTest extends TestLogger {
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 1);
                final SlotRequest slotRequest = new SlotRequest(jobId, 
allocationId, resourceProfile, "foobar");
 
-               final TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               when(taskExecutorGateway.requestSlot(
-                       any(SlotID.class),
-                       eq(jobId),
-                       eq(allocationId),
-                       anyString(),
-                       eq(resourceManagerId),
-                       
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               final CompletableFuture<SlotID> requestedSlotFuture = new 
CompletableFuture<>();
+               final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                       .setRequestSlotFunction(tuple5 -> {
+                               requestedSlotFuture.complete(tuple5.f0);
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .createTestingTaskExecutorGateway();
 
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(resourceId, taskExecutorGateway);
 
@@ -1028,17 +1006,9 @@ public class SlotManagerTest extends TestLogger {
                                        }
                                },
                                mainThreadExecutor)
-                       .thenAccept((Object value) -> 
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
-
-                       ArgumentCaptor<SlotID> slotIdArgumentCaptor = 
ArgumentCaptor.forClass(SlotID.class);
+                       .thenRun(() -> 
slotManager.registerTaskManager(taskManagerConnection, initialSlotReport));
 
-                       verify(taskExecutorGateway, 
timeout(verifyTimeout)).requestSlot(
-                               slotIdArgumentCaptor.capture(),
-                               eq(jobId),
-                               eq(allocationId),
-                               anyString(),
-                               eq(resourceManagerId),
-                               any(Time.class));
+                       final SlotID slotId = requestedSlotFuture.get();
 
                        CompletableFuture<Boolean> idleFuture = 
CompletableFuture.supplyAsync(
                                () -> 
slotManager.isTaskManagerIdle(taskManagerConnection.getInstanceID()),
@@ -1047,8 +1017,6 @@ public class SlotManagerTest extends TestLogger {
                        // check that the TaskManager is not idle
                        assertFalse(idleFuture.get());
 
-                       final SlotID slotId = slotIdArgumentCaptor.getValue();
-
                        CompletableFuture<TaskManagerSlot> slotFuture = 
CompletableFuture.supplyAsync(
                                () -> slotManager.getSlot(slotId),
                                mainThreadExecutor);

Reply via email to