Repository: flink
Updated Branches:
  refs/heads/master 7758571ae -> 477d1c5d4


http://git-wip-us.apache.org/repos/asf/flink/blob/477d1c5d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 0c79c4e..47a4090 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -46,7 +47,6 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskExecutionStateListener;
@@ -76,6 +76,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.PriorityQueue;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -188,15 +189,14 @@ public class StreamTaskTest {
                
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
+               PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
+               Executor executor = mock(Executor.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
-               JobManagerCommunicationFactory jobManagerCommunicationFactory = 
mock(JobManagerCommunicationFactory.class);
-               
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
-
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
                                new SerializedValue<>(new ExecutionConfig()),
@@ -215,7 +215,6 @@ public class StreamTaskTest {
                        mock(MemoryManager.class),
                        mock(IOManager.class),
                        network,
-                       jobManagerCommunicationFactory,
                        mock(BroadcastVariableManager.class),
                        mock(TaskManagerConnection.class),
                        mock(InputSplitProvider.class),
@@ -223,7 +222,10 @@ public class StreamTaskTest {
                        libCache,
                        mock(FileCache.class),
                        new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
-                       mock(TaskMetricGroup.class));
+                       mock(TaskMetricGroup.class),
+                       consumableNotifier,
+                       partitionStateChecker,
+                       executor);
        }
        
        // 
------------------------------------------------------------------------

Reply via email to