This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit db98eabf76fe35201b18a85fee9dbc13e2dd3379
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Feb 20 10:11:59 2019 +0100

    [hotfix][tests] Remove mocking from ExecutionGraphSchedulingTest
---
 .../ExecutionGraphSchedulingTest.java              | 57 ++++++++------------
 .../executiongraph/ExecutionGraphSuspendTest.java  | 38 -------------
 .../InteractionsCountingTaskManagerGateway.java    | 63 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 90adf09..a7c6947 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
@@ -49,7 +48,6 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -58,7 +56,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
-import org.mockito.verification.Timeout;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -80,12 +77,6 @@ import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -105,7 +96,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger 
{
        //  Tests
        // 
------------------------------------------------------------------------
 
-       
        /**
         * Tests that with scheduling futures and pipelined deployment, the 
target vertex will
         * not deploy its task before the source vertex does.
@@ -143,8 +133,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
 
                //  set up two TaskManager gateways and slots
 
-               final TaskManagerGateway gatewaySource = createTaskManager();
-               final TaskManagerGateway gatewayTarget = createTaskManager();
+               final InteractionsCountingTaskManagerGateway gatewaySource = 
createTaskManager();
+               final InteractionsCountingTaskManagerGateway gatewayTarget = 
createTaskManager();
 
                final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId);
                final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId);
@@ -160,15 +150,15 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                // that should not cause a deployment or deployment related 
failure
                targetFuture.complete(targetSlot);
 
-               verify(gatewayTarget, new Timeout(50, 
times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               assertThat(gatewayTarget.getSubmitTaskCount(), is(0));
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // now supply the source slot
                sourceFuture.complete(sourceSlot);
 
                // by now, all deployments should have happened
-               verify(gatewaySource, 
timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
-               verify(gatewayTarget, 
timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               assertThat(gatewaySource.getSubmitTaskCount(), is(1));
+               assertThat(gatewayTarget.getSubmitTaskCount(), is(1));
 
                assertEquals(JobStatus.RUNNING, eg.getState());
        }
@@ -207,8 +197,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                //
                //  Create the slots, futures, and the slot provider
 
-               final TaskManagerGateway[] sourceTaskManagers = new 
TaskManagerGateway[parallelism];
-               final TaskManagerGateway[] targetTaskManagers = new 
TaskManagerGateway[parallelism];
+               final InteractionsCountingTaskManagerGateway[] 
sourceTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism];
+               final InteractionsCountingTaskManagerGateway[] 
targetTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism];
 
                final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism];
                final SimpleSlot[] targetSlots = new SimpleSlot[parallelism];
@@ -264,11 +254,11 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                //
                //  verify that all deployments have happened
 
-               for (TaskManagerGateway gateway : sourceTaskManagers) {
-                       verify(gateway, 
timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               for (InteractionsCountingTaskManagerGateway gateway : 
sourceTaskManagers) {
+                       assertThat(gateway.getSubmitTaskCount(), is(1));
                }
-               for (TaskManagerGateway gateway : targetTaskManagers) {
-                       verify(gateway, 
timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               for (InteractionsCountingTaskManagerGateway gateway : 
targetTaskManagers) {
+                       assertThat(gateway.getSubmitTaskCount(), is(1));
                }
        }
 
@@ -299,7 +289,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                //
                //  Create the slots, futures, and the slot provider
 
-               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
+               final InteractionsCountingTaskManagerGateway taskManager = 
createTaskManager();
                final BlockingQueue<AllocationID> returnedSlots = new 
ArrayBlockingQueue<>(parallelism);
                final TestingSlotOwner slotOwner = new TestingSlotOwner();
                slotOwner.setReturnAllocatedSlotConsumer(
@@ -355,7 +345,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                }
 
                // no deployment calls must have happened
-               verify(taskManager, 
times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               assertThat(taskManager.getSubmitTaskCount(), is(0));
 
                // all completed futures must have been returns
                for (int i = 0; i < parallelism; i += 2) {
@@ -387,7 +377,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                slotOwner.setReturnAllocatedSlotConsumer(
                        (LogicalSlot logicalSlot) -> 
returnedSlots.offer(logicalSlot.getAllocationId()));
 
-               final TaskManagerGateway taskManager = 
mock(TaskManagerGateway.class);
+               final InteractionsCountingTaskManagerGateway taskManager = 
createTaskManager();
                final SimpleSlot[] slots = new SimpleSlot[parallelism];
                @SuppressWarnings({"unchecked", "rawtypes"})
                final CompletableFuture<LogicalSlot>[] slotFutures = new 
CompletableFuture[parallelism];
@@ -428,12 +418,12 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                }
 
                //  verify that no deployments have happened
-               verify(taskManager, 
times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               assertThat(taskManager.getSubmitTaskCount(), is(0));
        }
 
        /**
         * Tests that an ongoing scheduling operation does not fail the {@link 
ExecutionGraph}
-        * if it gets concurrently cancelled
+        * if it gets concurrently cancelled.
         */
        @Test
        public void testSchedulingOperationCancellationWhenCancel() throws 
Exception {
@@ -604,7 +594,7 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
        }
 
        private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId) {
-               return createSlot(taskManager, jobId, mock(SlotOwner.class));
+               return createSlot(taskManager, jobId, new TestingSlotOwner());
        }
 
        private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId, SlotOwner slotOwner) {
@@ -639,20 +629,17 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                        slotOwner);
        }
 
-       private static TaskManagerGateway createTaskManager() {
-               TaskManagerGateway tm = mock(TaskManagerGateway.class);
-               when(tm.submitTask(any(TaskDeploymentDescriptor.class), 
any(Time.class)))
-                               
.thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
-               return tm;
+       private static InteractionsCountingTaskManagerGateway 
createTaskManager() {
+               return new InteractionsCountingTaskManagerGateway();
        }
 
-       private static void verifyNothingDeployed(ExecutionGraph eg, 
TaskManagerGateway[] taskManagers) {
+       private static void verifyNothingDeployed(ExecutionGraph eg, 
InteractionsCountingTaskManagerGateway[] taskManagers) {
                // job should still be running
                assertEquals(JobStatus.RUNNING, eg.getState());
 
                // none of the TaskManager should have gotten a deployment 
call, yet
-               for (TaskManagerGateway gateway : taskManagers) {
-                       verify(gateway, new Timeout(50, 
times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+               for (InteractionsCountingTaskManagerGateway gateway : 
taskManagers) {
+                       assertThat(gateway.getSubmitTaskCount(), is(0));
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
index 4d9d436..d167a64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java
@@ -19,26 +19,19 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
-import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -306,35 +299,4 @@ public class ExecutionGraphSuspendTest extends TestLogger {
                return simpleTestGraph;
        }
 
-       private static class InteractionsCountingTaskManagerGateway extends 
SimpleAckingTaskManagerGateway {
-
-               private final AtomicInteger cancelTaskCount = new 
AtomicInteger(0);
-
-               private final AtomicInteger submitTaskCount = new 
AtomicInteger(0);
-
-               @Override
-               public CompletableFuture<Acknowledge> 
cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
-                       cancelTaskCount.incrementAndGet();
-                       return 
CompletableFuture.completedFuture(Acknowledge.get());
-               }
-
-               @Override
-               public CompletableFuture<Acknowledge> 
submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-                       submitTaskCount.incrementAndGet();
-                       return 
CompletableFuture.completedFuture(Acknowledge.get());
-               }
-
-               private void resetCounts() {
-                       cancelTaskCount.set(0);
-                       submitTaskCount.set(0);
-               }
-
-               private int getCancelTaskCount() {
-                       return cancelTaskCount.get();
-               }
-
-               private int getInteractionsCount() {
-                       return cancelTaskCount.get() + submitTaskCount.get();
-               }
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java
new file mode 100644
index 0000000..feab301
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+class InteractionsCountingTaskManagerGateway extends 
SimpleAckingTaskManagerGateway {
+
+       private final AtomicInteger cancelTaskCount = new AtomicInteger(0);
+
+       private final AtomicInteger submitTaskCount = new AtomicInteger(0);
+
+       @Override
+       public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+               cancelTaskCount.incrementAndGet();
+               return CompletableFuture.completedFuture(Acknowledge.get());
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> 
submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
+               submitTaskCount.incrementAndGet();
+               return CompletableFuture.completedFuture(Acknowledge.get());
+       }
+
+       void resetCounts() {
+               cancelTaskCount.set(0);
+               submitTaskCount.set(0);
+       }
+
+       int getCancelTaskCount() {
+               return cancelTaskCount.get();
+       }
+
+       int getSubmitTaskCount() {
+               return submitTaskCount.get();
+       }
+
+       int getInteractionsCount() {
+               return cancelTaskCount.get() + submitTaskCount.get();
+       }
+}

Reply via email to