Repository: flink Updated Branches: refs/heads/master 7758571ae -> 477d1c5d4
http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 0c79c4e..47a4090 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.filecache.FileCache; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -46,7 +47,6 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.taskmanager.CheckpointResponder; -import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener; @@ -76,6 +76,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.PriorityQueue; import java.util.UUID; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import static org.mockito.Matchers.any; @@ -188,15 +189,14 @@ public class StreamTaskTest { ResultPartitionManager partitionManager = mock(ResultPartitionManager.class); ResultPartitionConsumableNotifier consumableNotifier = mock(ResultPartitionConsumableNotifier.class); + PartitionStateChecker partitionStateChecker = mock(PartitionStateChecker.class); + Executor executor = mock(Executor.class); NetworkEnvironment network = mock(NetworkEnvironment.class); when(network.getResultPartitionManager()).thenReturn(partitionManager); when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC); when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) .thenReturn(mock(TaskKvStateRegistry.class)); - JobManagerCommunicationFactory jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class); - when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier); - TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( new JobID(), "Job Name", new JobVertexID(), new ExecutionAttemptID(), new SerializedValue<>(new ExecutionConfig()), @@ -215,7 +215,6 @@ public class StreamTaskTest { mock(MemoryManager.class), mock(IOManager.class), network, - jobManagerCommunicationFactory, mock(BroadcastVariableManager.class), mock(TaskManagerConnection.class), mock(InputSplitProvider.class), @@ -223,7 +222,10 @@ public class StreamTaskTest { libCache, mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), - mock(TaskMetricGroup.class)); + mock(TaskMetricGroup.class), + consumableNotifier, + partitionStateChecker, + executor); } // ------------------------------------------------------------------------