This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit db98eabf76fe35201b18a85fee9dbc13e2dd3379 Author: Till Rohrmann <trohrm...@apache.org> AuthorDate: Wed Feb 20 10:11:59 2019 +0100 [hotfix][tests] Remove mocking from ExecutionGraphSchedulingTest --- .../ExecutionGraphSchedulingTest.java | 57 ++++++++------------ .../executiongraph/ExecutionGraphSuspendTest.java | 38 ------------- .../InteractionsCountingTaskManagerGateway.java | 63 ++++++++++++++++++++++ 3 files changed, 85 insertions(+), 73 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 90adf09..a7c6947 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; @@ -49,7 +48,6 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -58,7 +56,6 @@ import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Test; -import org.mockito.verification.Timeout; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -80,12 +77,6 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * Tests for the scheduling of the execution graph. This tests that @@ -105,7 +96,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // Tests // ------------------------------------------------------------------------ - /** * Tests that with scheduling futures and pipelined deployment, the target vertex will * not deploy its task before the source vertex does. @@ -143,8 +133,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // set up two TaskManager gateways and slots - final TaskManagerGateway gatewaySource = createTaskManager(); - final TaskManagerGateway gatewayTarget = createTaskManager(); + final InteractionsCountingTaskManagerGateway gatewaySource = createTaskManager(); + final InteractionsCountingTaskManagerGateway gatewayTarget = createTaskManager(); final SimpleSlot sourceSlot = createSlot(gatewaySource, jobId); final SimpleSlot targetSlot = createSlot(gatewayTarget, jobId); @@ -160,15 +150,15 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // that should not cause a deployment or deployment related failure targetFuture.complete(targetSlot); - verify(gatewayTarget, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + assertThat(gatewayTarget.getSubmitTaskCount(), is(0)); assertEquals(JobStatus.RUNNING, eg.getState()); // now supply the source slot sourceFuture.complete(sourceSlot); // by now, all deployments should have happened - verify(gatewaySource, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); - verify(gatewayTarget, timeout(1000)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + assertThat(gatewaySource.getSubmitTaskCount(), is(1)); + assertThat(gatewayTarget.getSubmitTaskCount(), is(1)); assertEquals(JobStatus.RUNNING, eg.getState()); } @@ -207,8 +197,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // // Create the slots, futures, and the slot provider - final TaskManagerGateway[] sourceTaskManagers = new TaskManagerGateway[parallelism]; - final TaskManagerGateway[] targetTaskManagers = new TaskManagerGateway[parallelism]; + final InteractionsCountingTaskManagerGateway[] sourceTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism]; + final InteractionsCountingTaskManagerGateway[] targetTaskManagers = new InteractionsCountingTaskManagerGateway[parallelism]; final SimpleSlot[] sourceSlots = new SimpleSlot[parallelism]; final SimpleSlot[] targetSlots = new SimpleSlot[parallelism]; @@ -264,11 +254,11 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // // verify that all deployments have happened - for (TaskManagerGateway gateway : sourceTaskManagers) { - verify(gateway, timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + for (InteractionsCountingTaskManagerGateway gateway : sourceTaskManagers) { + assertThat(gateway.getSubmitTaskCount(), is(1)); } - for (TaskManagerGateway gateway : targetTaskManagers) { - verify(gateway, timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + for (InteractionsCountingTaskManagerGateway gateway : targetTaskManagers) { + assertThat(gateway.getSubmitTaskCount(), is(1)); } } @@ -299,7 +289,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // // Create the slots, futures, and the slot provider - final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); + final InteractionsCountingTaskManagerGateway taskManager = createTaskManager(); final BlockingQueue<AllocationID> returnedSlots = new ArrayBlockingQueue<>(parallelism); final TestingSlotOwner slotOwner = new TestingSlotOwner(); slotOwner.setReturnAllocatedSlotConsumer( @@ -355,7 +345,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { } // no deployment calls must have happened - verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + assertThat(taskManager.getSubmitTaskCount(), is(0)); // all completed futures must have been returns for (int i = 0; i < parallelism; i += 2) { @@ -387,7 +377,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { slotOwner.setReturnAllocatedSlotConsumer( (LogicalSlot logicalSlot) -> returnedSlots.offer(logicalSlot.getAllocationId())); - final TaskManagerGateway taskManager = mock(TaskManagerGateway.class); + final InteractionsCountingTaskManagerGateway taskManager = createTaskManager(); final SimpleSlot[] slots = new SimpleSlot[parallelism]; @SuppressWarnings({"unchecked", "rawtypes"}) final CompletableFuture<LogicalSlot>[] slotFutures = new CompletableFuture[parallelism]; @@ -428,12 +418,12 @@ public class ExecutionGraphSchedulingTest extends TestLogger { } // verify that no deployments have happened - verify(taskManager, times(0)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + assertThat(taskManager.getSubmitTaskCount(), is(0)); } /** * Tests that an ongoing scheduling operation does not fail the {@link ExecutionGraph} - * if it gets concurrently cancelled + * if it gets concurrently cancelled. */ @Test public void testSchedulingOperationCancellationWhenCancel() throws Exception { @@ -604,7 +594,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger { } private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) { - return createSlot(taskManager, jobId, mock(SlotOwner.class)); + return createSlot(taskManager, jobId, new TestingSlotOwner()); } private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId, SlotOwner slotOwner) { @@ -639,20 +629,17 @@ public class ExecutionGraphSchedulingTest extends TestLogger { slotOwner); } - private static TaskManagerGateway createTaskManager() { - TaskManagerGateway tm = mock(TaskManagerGateway.class); - when(tm.submitTask(any(TaskDeploymentDescriptor.class), any(Time.class))) - .thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); - return tm; + private static InteractionsCountingTaskManagerGateway createTaskManager() { + return new InteractionsCountingTaskManagerGateway(); } - private static void verifyNothingDeployed(ExecutionGraph eg, TaskManagerGateway[] taskManagers) { + private static void verifyNothingDeployed(ExecutionGraph eg, InteractionsCountingTaskManagerGateway[] taskManagers) { // job should still be running assertEquals(JobStatus.RUNNING, eg.getState()); // none of the TaskManager should have gotten a deployment call, yet - for (TaskManagerGateway gateway : taskManagers) { - verify(gateway, new Timeout(50, times(0))).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + for (InteractionsCountingTaskManagerGateway gateway : taskManagers) { + assertThat(gateway.getSubmitTaskCount(), is(0)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index 4d9d436..d167a64 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -19,26 +19,19 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.TestLogger; import org.junit.Test; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; - import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -306,35 +299,4 @@ public class ExecutionGraphSuspendTest extends TestLogger { return simpleTestGraph; } - private static class InteractionsCountingTaskManagerGateway extends SimpleAckingTaskManagerGateway { - - private final AtomicInteger cancelTaskCount = new AtomicInteger(0); - - private final AtomicInteger submitTaskCount = new AtomicInteger(0); - - @Override - public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { - cancelTaskCount.incrementAndGet(); - return CompletableFuture.completedFuture(Acknowledge.get()); - } - - @Override - public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) { - submitTaskCount.incrementAndGet(); - return CompletableFuture.completedFuture(Acknowledge.get()); - } - - private void resetCounts() { - cancelTaskCount.set(0); - submitTaskCount.set(0); - } - - private int getCancelTaskCount() { - return cancelTaskCount.get(); - } - - private int getInteractionsCount() { - return cancelTaskCount.get() + submitTaskCount.get(); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java new file mode 100644 index 0000000..feab301 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/InteractionsCountingTaskManagerGateway.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.messages.Acknowledge; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +class InteractionsCountingTaskManagerGateway extends SimpleAckingTaskManagerGateway { + + private final AtomicInteger cancelTaskCount = new AtomicInteger(0); + + private final AtomicInteger submitTaskCount = new AtomicInteger(0); + + @Override + public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) { + cancelTaskCount.incrementAndGet(); + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + @Override + public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) { + submitTaskCount.incrementAndGet(); + return CompletableFuture.completedFuture(Acknowledge.get()); + } + + void resetCounts() { + cancelTaskCount.set(0); + submitTaskCount.set(0); + } + + int getCancelTaskCount() { + return cancelTaskCount.get(); + } + + int getSubmitTaskCount() { + return submitTaskCount.get(); + } + + int getInteractionsCount() { + return cancelTaskCount.get() + submitTaskCount.get(); + } +}