Repository: flink
Updated Branches:
  refs/heads/master bc9d52391 -> 78f2a1586


http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index 4597e3b..9f39de1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,39 +19,34 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.DummyActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import 
org.apache.flink.runtime.taskmanager.ActorGatewayResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.NetUtils;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import scala.Some;
-import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
-import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
@@ -61,100 +56,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class NetworkEnvironmentTest {
-
-       @Test
-       public void testAssociateDisassociate() {
-               final int BUFFER_SIZE = 1024;
-               final int NUM_BUFFERS = 20;
-
-               final int port;
-               try {
-                       port = NetUtils.getAvailablePort();
-               }
-               catch (Throwable t) {
-                       // ignore
-                       return;
-               }
-
-               try {
-                       NettyConfig nettyConf = new 
NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new 
Configuration());
-                       NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
-                                       NUM_BUFFERS, BUFFER_SIZE, 
MemoryType.HEAP,
-                                       IOManager.IOMode.SYNC, 0, 0, 0, new 
Some<>(nettyConf),
-                                       new Tuple2<>(0, 0));
-
-                       NetworkEnvironment env = new NetworkEnvironment(
-                               TestingUtils.defaultExecutionContext(),
-                               new FiniteDuration(30, TimeUnit.SECONDS),
-                               config,
-                               new 
InstanceConnectionInfo(InetAddress.getLocalHost(), port));
-
-                       assertFalse(env.isShutdown());
-                       assertFalse(env.isAssociated());
-
-                       // pool must be started already
-                       assertNotNull(env.getNetworkBufferPool());
-                       assertEquals(NUM_BUFFERS, 
env.getNetworkBufferPool().getTotalNumberOfMemorySegments());
-
-                       // others components are still shut down
-                       assertNull(env.getConnectionManager());
-                       assertNull(env.getPartitionConsumableNotifier());
-                       assertNull(env.getTaskEventDispatcher());
-                       assertNull(env.getPartitionManager());
-
-                       // associate the environment with some mock actors
-                       env.associateWithTaskManagerAndJobManager(
-                                       DummyActorGateway.INSTANCE,
-                                       DummyActorGateway.INSTANCE);
-
-                       assertNotNull(env.getConnectionManager());
-                       assertNotNull(env.getPartitionConsumableNotifier());
-                       assertNotNull(env.getTaskEventDispatcher());
-                       assertNotNull(env.getPartitionManager());
-
-                       // allocate some buffer pool
-                       BufferPool localPool = 
env.getNetworkBufferPool().createBufferPool(10, false);
-                       assertNotNull(localPool);
-
-                       // disassociate
-                       env.disassociate();
-
-                       assertNull(env.getConnectionManager());
-                       assertNull(env.getPartitionConsumableNotifier());
-                       assertNull(env.getTaskEventDispatcher());
-                       assertNull(env.getPartitionManager());
-
-                       assertNotNull(env.getNetworkBufferPool());
-                       assertTrue(localPool.isDestroyed());
-
-                       // associate once again
-                       env.associateWithTaskManagerAndJobManager(
-                                       DummyActorGateway.INSTANCE,
-                                       DummyActorGateway.INSTANCE
-                       );
-
-                       assertNotNull(env.getConnectionManager());
-                       assertNotNull(env.getPartitionConsumableNotifier());
-                       assertNotNull(env.getTaskEventDispatcher());
-                       assertNotNull(env.getPartitionManager());
-
-                       // shutdown for good
-                       env.shutdown();
-
-                       assertTrue(env.isShutdown());
-                       assertFalse(env.isAssociated());
-                       assertNull(env.getConnectionManager());
-                       assertNull(env.getPartitionConsumableNotifier());
-                       assertNull(env.getTaskEventDispatcher());
-                       assertNull(env.getPartitionManager());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-
        /**
         * Registers a task with an eager and non-eager partition at the network
         * environment and verifies that there is exactly on schedule or update
@@ -164,45 +65,61 @@ public class NetworkEnvironmentTest {
        @SuppressWarnings("unchecked")
        public void testEagerlyDeployConsumers() throws Exception {
                // Mock job manager => expected interactions will be verified
-               ActorGateway jobManager = mock(ActorGateway.class);
+               final ActorGateway jobManager = mock(ActorGateway.class);
                when(jobManager.ask(anyObject(), any(FiniteDuration.class)))
                                .thenReturn(new 
Promise.DefaultPromise<>().future());
 
                // Network environment setup
                NetworkEnvironmentConfiguration config = new 
NetworkEnvironmentConfiguration(
-                               20,
-                               1024,
-                               MemoryType.HEAP,
-                               IOManager.IOMode.SYNC,
-                               0,
-                               0,
-                               0,
-                               Some.<NettyConfig>empty(),
-                               new Tuple2<>(0, 0));
+                       20,
+                       1024,
+                       MemoryType.HEAP,
+                       IOManager.IOMode.SYNC,
+                       0,
+                       0,
+                       0,
+                       Some.<NettyConfig>empty(),
+                       0,
+                       0);
 
                NetworkEnvironment env = new NetworkEnvironment(
-                               TestingUtils.defaultExecutionContext(),
-                               new FiniteDuration(30, TimeUnit.SECONDS),
-                               config,
-                               new 
InstanceConnectionInfo(InetAddress.getLocalHost(), 12232));
-
-               // Associate the environment with the mock actors
-               env.associateWithTaskManagerAndJobManager(
-                               jobManager,
-                               DummyActorGateway.INSTANCE);
+                       new NetworkBufferPool(config.numNetworkBuffers(), 
config.networkBufferSize(), config.memoryType()),
+                       new LocalConnectionManager(),
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new KvStateRegistry(),
+                       null,
+                       config.ioMode(),
+                       config.partitionRequestInitialBackoff(),
+                       config.partitinRequestMaxBackoff());
+
+               env.start();
+
+               JobManagerCommunicationFactory jobManagerCommunicationFactory = 
mock(JobManagerCommunicationFactory.class);
+
+               
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenAnswer(new
 Answer<ResultPartitionConsumableNotifier>() {
+                       @Override
+                       public ResultPartitionConsumableNotifier 
answer(InvocationOnMock invocation) throws Throwable {
+                               return new 
ActorGatewayResultPartitionConsumableNotifier(
+                                       TestingUtils.defaultExecutionContext(),
+                                       jobManager,
+                                       (Task)invocation.getArguments()[0],
+                                       new FiniteDuration(30, 
TimeUnit.SECONDS));
+                       }
+               });
 
                // Register mock task
                JobID jobId = new JobID();
+               Task mockTask = mock(Task.class);
 
                ResultPartition[] partitions = new ResultPartition[2];
-               partitions[0] = createPartition("p1", jobId, true, env);
-               partitions[1] = createPartition("p2", jobId, false, env);
+               partitions[0] = createPartition(mockTask, "p1", jobId, true, 
env, jobManagerCommunicationFactory);
+               partitions[1] = createPartition(mockTask, "p2", jobId, false, 
env, jobManagerCommunicationFactory);
 
                ResultPartitionWriter[] writers = new ResultPartitionWriter[2];
                writers[0] = new ResultPartitionWriter(partitions[0]);
                writers[1] = new ResultPartitionWriter(partitions[1]);
 
-               Task mockTask = mock(Task.class);
                when(mockTask.getAllInputGates()).thenReturn(new 
SingleInputGate[0]);
                when(mockTask.getAllWriters()).thenReturn(writers);
                when(mockTask.getProducedPartitions()).thenReturn(partitions);
@@ -221,10 +138,12 @@ public class NetworkEnvironmentTest {
         * Helper to create a mock result partition.
         */
        private static ResultPartition createPartition(
-                       String name,
-                       JobID jobId,
-                       boolean eagerlyDeployConsumers,
-                       NetworkEnvironment env) {
+               Task owningTask,
+               String name,
+               JobID jobId,
+               boolean eagerlyDeployConsumers,
+               NetworkEnvironment env,
+               JobManagerCommunicationFactory jobManagerCommunicationFactory) {
 
                return new ResultPartition(
                                name,
@@ -233,8 +152,8 @@ public class NetworkEnvironmentTest {
                                ResultPartitionType.PIPELINED,
                                eagerlyDeployConsumers,
                                1,
-                               env.getPartitionManager(),
-                               env.getPartitionConsumableNotifier(),
+                               env.getResultPartitionManager(),
+                               
jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(owningTask),
                                mock(IOManager.class),
                                env.getDefaultIOMode());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
index 0868398..8884b29 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.junit.Test;
-import scala.Tuple2;
 
 import java.io.IOException;
 
@@ -103,10 +102,11 @@ public class InputChannelTest {
 
        private InputChannel createInputChannel(int initialBackoff, int 
maxBackoff) {
                return new MockInputChannel(
-                               mock(SingleInputGate.class),
-                               0,
-                               new ResultPartitionID(),
-                               new Tuple2<Integer, Integer>(initialBackoff, 
maxBackoff));
+                       mock(SingleInputGate.class),
+                       0,
+                       new ResultPartitionID(),
+                       initialBackoff,
+                       maxBackoff);
        }
 
        // 
---------------------------------------------------------------------------------------------
@@ -117,9 +117,10 @@ public class InputChannelTest {
                                SingleInputGate inputGate,
                                int channelIndex,
                                ResultPartitionID partitionId,
-                               Tuple2<Integer, Integer> initialAndMaxBackoff) {
+                               int initialBackoff,
+                               int maxBackoff) {
 
-                       super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, new SimpleCounter());
+                       super(inputGate, channelIndex, partitionId, 
initialBackoff, maxBackoff, new SimpleCounter());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index f91a4ba..18d9073 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -267,13 +267,14 @@ public class LocalInputChannelTest {
                        throws IOException, InterruptedException {
 
                return new LocalInputChannel(
-                               inputGate,
-                               0,
-                               new ResultPartitionID(),
-                               partitionManager,
-                               mock(TaskEventDispatcher.class),
-                               initialAndMaxRequestBackoff,
-                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       inputGate,
+                       0,
+                       new ResultPartitionID(),
+                       partitionManager,
+                       mock(TaskEventDispatcher.class),
+                       initialAndMaxRequestBackoff._1(),
+                       initialAndMaxRequestBackoff._2(),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9eb49ef..9a79ff8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -299,12 +299,13 @@ public class RemoteInputChannelTest {
                                .thenReturn(partitionRequestClient);
 
                return new RemoteInputChannel(
-                               inputGate,
-                               0,
-                               new ResultPartitionID(),
-                               mock(ConnectionID.class),
-                               connectionManager,
-                               initialAndMaxRequestBackoff,
-                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       inputGate,
+                       0,
+                       new ResultPartitionID(),
+                       mock(ConnectionID.class),
+                       connectionManager,
+                       initialAndMaxRequestBackoff._1(),
+                       initialAndMaxRequestBackoff._2(),
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 05427a1..f55fee5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -128,7 +128,7 @@ public class SingleInputGateTest {
                // Unknown
                ResultPartitionID unknownPartitionId = new 
ResultPartitionID(new IntermediateResultPartitionID(), new 
ExecutionAttemptID());
 
-               InputChannel unknown = new UnknownInputChannel(inputGate, 1, 
unknownPartitionId, partitionManager, taskEventDispatcher, 
mock(ConnectionManager.class), new Tuple2<Integer, Integer>(0, 0), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+               InputChannel unknown = new UnknownInputChannel(inputGate, 1, 
unknownPartitionId, partitionManager, taskEventDispatcher, 
mock(ConnectionManager.class), 0, 0, new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                // Set channels
                inputGate.setInputChannel(localPartitionId.getPartitionId(), 
local);
@@ -174,13 +174,15 @@ public class SingleInputGateTest {
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
                InputChannel unknown = new UnknownInputChannel(
-                               inputGate,
-                               0,
-                               new ResultPartitionID(),
-                               partitionManager,
-                               new TaskEventDispatcher(),
-                               new LocalConnectionManager(),
-                               new Tuple2<Integer, Integer>(0, 0), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       inputGate,
+                       0,
+                       new ResultPartitionID(),
+                       partitionManager,
+                       new TaskEventDispatcher(),
+                       new LocalConnectionManager(),
+                       0,
+                       0,
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 
@@ -213,14 +215,15 @@ public class SingleInputGateTest {
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                InputChannel unknown = new UnknownInputChannel(
-                               inputGate,
-                               0,
-                               new ResultPartitionID(),
-                               new ResultPartitionManager(),
-                               new TaskEventDispatcher(),
-                               new LocalConnectionManager(),
-                               new Tuple2<Integer, Integer>(0, 0),
-                               new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       inputGate,
+                       0,
+                       new ResultPartitionID(),
+                       new ResultPartitionManager(),
+                       new TaskEventDispatcher(),
+                       new LocalConnectionManager(),
+                       0,
+                       0,
+                       new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                inputGate.setInputChannel(unknown.partitionId.getPartitionId(), 
unknown);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index ab4ca3b..9501c7c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -150,12 +150,14 @@ public class TaskAsyncCallTest {
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
-               
when(networkEnvironment.getPartitionManager()).thenReturn(partitionManager);
-               
when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+               
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
                
when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
+               JobManagerCommunicationFactory jobManagerCommunicationFactory = 
mock(JobManagerCommunicationFactory.class);
+               
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
+
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
                                new SerializedValue<>(new ExecutionConfig()),
@@ -170,17 +172,18 @@ public class TaskAsyncCallTest {
 
                ActorGateway taskManagerGateway = DummyActorGateway.INSTANCE;
                return new Task(tdd,
-                               mock(MemoryManager.class),
-                               mock(IOManager.class),
-                               networkEnvironment,
-                               mock(BroadcastVariableManager.class),
-                               taskManagerGateway,
-                               DummyActorGateway.INSTANCE,
-                               new FiniteDuration(60, TimeUnit.SECONDS),
-                               libCache,
-                               mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
-                               mock(TaskMetricGroup.class));
+                       mock(MemoryManager.class),
+                       mock(IOManager.class),
+                       networkEnvironment,
+                       jobManagerCommunicationFactory,
+                       mock(BroadcastVariableManager.class),
+                       taskManagerGateway,
+                       DummyActorGateway.INSTANCE,
+                       new FiniteDuration(60, TimeUnit.SECONDS),
+                       libCache,
+                       mock(FileCache.class),
+                       new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
+                       mock(TaskMetricGroup.class));
        }
 
        public static class CheckpointsInOrderInvokable extends 
AbstractInvokable implements StatefulTask {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 147a3e0..3371c49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -35,8 +35,12 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceConnectionInfo;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -44,11 +48,11 @@ import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.junit.Test;
 import scala.Option;
-import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.net.InetAddress;
@@ -99,32 +103,40 @@ public class TaskManagerComponentsStartupShutdownTest {
 
                        final NetworkEnvironmentConfiguration netConf = new 
NetworkEnvironmentConfiguration(
                                        32, BUFFER_SIZE, MemoryType.HEAP, 
IOManager.IOMode.SYNC, 0, 0, 0,
-                                       Option.<NettyConfig>empty(), new 
Tuple2<Integer, Integer>(0, 0));
+                                       Option.<NettyConfig>empty(), 0, 0);
 
                        final InstanceConnectionInfo connectionInfo = new 
InstanceConnectionInfo(InetAddress.getLocalHost(), 10000);
 
                        final MemoryManager memManager = new MemoryManager(32 * 
BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
                        final IOManager ioManager = new IOManagerAsync(TMP_DIR);
                        final NetworkEnvironment network = new 
NetworkEnvironment(
-                               TestingUtils.defaultExecutionContext(),
-                               timeout,
-                               netConf,
-                               connectionInfo);
+                               new 
NetworkBufferPool(netConf.numNetworkBuffers(), netConf.networkBufferSize(), 
netConf.memoryType()),
+                               new LocalConnectionManager(),
+                               new ResultPartitionManager(),
+                               new TaskEventDispatcher(),
+                               new KvStateRegistry(),
+                               null,
+                               netConf.ioMode(),
+                               netConf.partitionRequestInitialBackoff(),
+                               netConf.partitinRequestMaxBackoff());
+
+                       network.start();
+
                        final int numberOfSlots = 1;
 
                        LeaderRetrievalService leaderRetrievalService = new 
StandaloneLeaderRetrievalService(jobManager.path().toString());
 
                        // create the task manager
                        final Props tmProps = Props.create(
-                                       TaskManager.class,
-                                       tmConfig,
-                                       ResourceID.generate(),
-                                       connectionInfo,
-                                       memManager,
-                                       ioManager,
-                                       network,
-                                       numberOfSlots,
-                                       leaderRetrievalService);
+                               TaskManager.class,
+                               tmConfig,
+                               ResourceID.generate(),
+                               connectionInfo,
+                               memManager,
+                               ioManager,
+                               network,
+                               numberOfSlots,
+                               leaderRetrievalService);
 
                        final ActorRef taskManager = 
actorSystem.actorOf(tmProps);
 
@@ -142,9 +154,6 @@ public class TaskManagerComponentsStartupShutdownTest {
                                };
                        }};
 
-                       // the components should now all be initialized
-                       assertTrue(network.isAssociated());
-
                        // shut down all actors and the actor system
                        // Kill the Task down the JobManager
                        taskManager.tell(Kill.getInstance(), 
ActorRef.noSender());
@@ -156,7 +165,6 @@ public class TaskManagerComponentsStartupShutdownTest {
                        actorSystem = null;
 
                        // now that the TaskManager is shut down, the 
components should be shut down as well
-                       assertFalse(network.isAssociated());
                        assertTrue(network.isShutdown());
                        assertTrue(ioManager.isProperlyShutDown());
                        assertTrue(memManager.isShutdown());

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
index e23aba7..53fa7c1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.java
@@ -28,12 +28,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
@@ -47,15 +45,11 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import scala.Option;
-import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.ServerSocket;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -548,69 +542,6 @@ public class TaskManagerRegistrationTest extends 
TestLogger {
                }};
        }
 
-
-       @Test
-       public void testStartupWhenNetworkStackFailsToInitialize() {
-
-               ServerSocket blocker = null;
-
-               try {
-                       blocker = new ServerSocket(0, 50, 
InetAddress.getByName("localhost"));
-
-                       final Configuration cfg = new Configuration();
-                       
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
-                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
blocker.getLocalPort());
-                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
-
-                       new JavaTestKit(actorSystem) {{
-                               ActorRef taskManager = null;
-                               ActorRef jobManager = null;
-                               ActorRef resourceManager = null;
-
-                               try {
-                                       // a simple JobManager
-                                       jobManager = startJobManager(config);
-
-                                       resourceManager = 
startResourceManager(config, jobManager);
-
-                                       // start a task manager with a 
configuration that provides a blocked port
-                                       taskManager = 
TaskManager.startTaskManagerComponentsAndActor(
-                                                       cfg, 
ResourceID.generate(), actorSystem, "localhost",
-                                                       NONE_STRING, // no 
actor name -> random
-                                                       new 
Some<LeaderRetrievalService>(new 
StandaloneLeaderRetrievalService(jobManager.path().toString())),
-                                                       false, // init network 
stack !!!
-                                                       TaskManager.class);
-
-                                       watch(taskManager);
-
-                                       expectTerminated(timeout, taskManager);
-                               }
-                               catch (Exception e) {
-                                       e.printStackTrace();
-                                       fail(e.getMessage());
-                               } finally {
-                                       stopActor(taskManager);
-                                       stopActor(jobManager);
-                               }
-                       }};
-               }
-               catch (Exception e) {
-                       // does not work, skip test
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       if (blocker != null) {
-                               try {
-                                       blocker.close();
-                               }
-                               catch (IOException e) {
-                                       // ignore, best effort
-                               }
-                       }
-               }
-       }
-
        @Test
        public void testCheckForValidRegistrationSessionIDs() {
                new JavaTestKit(actorSystem) {{

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 149df6e..686de76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -25,9 +25,10 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.util.StartupUtils;
-import org.junit.Before;
 import org.junit.Test;
+import scala.Option;
 
 import java.io.File;
 import java.io.IOException;
@@ -179,4 +180,43 @@ public class TaskManagerStartupTest {
                        fail(e.getMessage());
                }
        }
+
+       /**
+        * Tests that the task manager start-up fails if the network stack 
cannot be initialized.
+        * @throws Exception
+        */
+       @Test(expected = IOException.class)
+       public void testStartupWhenNetworkStackFailsToInitialize() throws 
Exception {
+
+               ServerSocket blocker = null;
+
+               try {
+                       blocker = new ServerSocket(0, 50, 
InetAddress.getByName("localhost"));
+
+                       final Configuration cfg = new Configuration();
+                       
cfg.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, "localhost");
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 
blocker.getLocalPort());
+                       
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
+
+                       TaskManager.startTaskManagerComponentsAndActor(
+                               cfg,
+                               ResourceID.generate(),
+                               null,
+                               "localhost",
+                               Option.<String>empty(),
+                               Option.<LeaderRetrievalService>empty(),
+                               false,
+                               TaskManager.class);
+               }
+               finally {
+                       if (blocker != null) {
+                               try {
+                                       blocker.close();
+                               }
+                               catch (IOException e) {
+                                       // ignore, best effort
+                               }
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 54cd7c6..0e53673 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -990,11 +990,8 @@ public class TaskManagerTest extends TestLogger {
 
                                jobManager = new AkkaActorGateway(jm, 
leaderSessionID);
 
-                               final int dataPort = 
NetUtils.getAvailablePort();
                                final Configuration config = new 
Configuration();
 
-                               
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort);
-
                                taskManager = TestingUtils.createTaskManager(
                                                system,
                                                jobManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index fea21be..cfa7fb6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -66,10 +66,20 @@ public class TaskStopTest {
                
when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
                when(tddMock.getInvokableClassName()).thenReturn("className");
 
-               task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
-                               mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
-                               mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
-                               mock(TaskManagerRuntimeInfo.class), 
mock(TaskMetricGroup.class));
+               task = new Task(
+                       tddMock,
+                       mock(MemoryManager.class),
+                       mock(IOManager.class),
+                       mock(NetworkEnvironment.class),
+                       mock(JobManagerCommunicationFactory.class),
+                       mock(BroadcastVariableManager.class),
+                       mock(ActorGateway.class),
+                       mock(ActorGateway.class),
+                       mock(FiniteDuration.class),
+                       mock(LibraryCacheManager.class),
+                       mock(FileCache.class),
+                       mock(TaskManagerRuntimeInfo.class),
+                       mock(TaskMetricGroup.class));
                Field f = task.getClass().getDeclaredField("invokable");
                f.setAccessible(true);
                f.set(task, taskMock);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index f145b48..9e8f8f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -244,12 +244,14 @@ public class TaskTest {
                        ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                        ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                        NetworkEnvironment network = 
mock(NetworkEnvironment.class);
-                       
when(network.getPartitionManager()).thenReturn(partitionManager);
-                       
when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+                       
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                        
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                        doThrow(new 
RuntimeException("buffers")).when(network).registerTask(any(Task.class));
+
+                       JobManagerCommunicationFactory 
jobManagerCommunicationFactory = mock(JobManagerCommunicationFactory.class);
+                       
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
                        
-                       Task task = createTask(TestInvokableCorrect.class, 
libCache, network);
+                       Task task = createTask(TestInvokableCorrect.class, 
libCache, network, jobManagerCommunicationFactory);
 
                        task.registerExecutionListener(listenerGateway);
 
@@ -598,18 +600,22 @@ public class TaskTest {
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
-               
when(network.getPartitionManager()).thenReturn(partitionManager);
-               
when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+               JobManagerCommunicationFactory jobManagerCommunicationFactory = 
mock(JobManagerCommunicationFactory.class);
+               
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
+
+               
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
                
-               return createTask(invokable, libCache, network);
+               return createTask(invokable, libCache, network, 
jobManagerCommunicationFactory);
        }
        
-       private Task createTask(Class<? extends AbstractInvokable> invokable,
-                                                       LibraryCacheManager 
libCache,
-                                                       NetworkEnvironment 
networkEnvironment) {
+       private Task createTask(
+               Class<? extends AbstractInvokable> invokable,
+               LibraryCacheManager libCache,
+               NetworkEnvironment networkEnvironment,
+               JobManagerCommunicationFactory jobManagerCommunicationFactory) {
                
                TaskDeploymentDescriptor tdd = 
createTaskDeploymentDescriptor(invokable);
                
@@ -618,6 +624,7 @@ public class TaskTest {
                                mock(MemoryManager.class),
                                mock(IOManager.class),
                                networkEnvironment,
+                       jobManagerCommunicationFactory,
                                mock(BroadcastVariableManager.class),
                                taskManagerGateway,
                                jobManagerGateway,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index f8b4063..e1c9407 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -41,13 +41,12 @@ import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractCloseableHandle;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -148,19 +147,20 @@ public class InterruptSensitiveRestoreTest {
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
                return new Task(
-                               tdd,
-                               mock(MemoryManager.class),
-                               mock(IOManager.class),
-                               networkEnvironment,
-                               mock(BroadcastVariableManager.class),
-                               mock(ActorGateway.class),
-                               mock(ActorGateway.class),
-                               new FiniteDuration(10, TimeUnit.SECONDS),
-                               new FallbackLibraryCacheManager(),
-                               new FileCache(new Configuration()),
-                               new TaskManagerRuntimeInfo(
-                                               "localhost", new 
Configuration(), EnvironmentInformation.getTemporaryFileDirectory()),
-                               new UnregisteredTaskMetricsGroup());
+                       tdd,
+                       mock(MemoryManager.class),
+                       mock(IOManager.class),
+                       networkEnvironment,
+                       mock(JobManagerCommunicationFactory.class),
+                       mock(BroadcastVariableManager.class),
+                       mock(ActorGateway.class),
+                       mock(ActorGateway.class),
+                       new FiniteDuration(10, TimeUnit.SECONDS),
+                       new FallbackLibraryCacheManager(),
+                       new FileCache(new Configuration()),
+                       new TaskManagerRuntimeInfo(
+                                       "localhost", new Configuration(), 
EnvironmentInformation.getTemporaryFileDirectory()),
+                       new UnregisteredTaskMetricsGroup());
                
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 408b5b1..0a9d2fa 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.taskmanager.JobManagerCommunicationFactory;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -81,9 +82,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class StreamTaskTest {
 
@@ -228,12 +226,14 @@ public class StreamTaskTest {
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
                NetworkEnvironment network = mock(NetworkEnvironment.class);
-               
when(network.getPartitionManager()).thenReturn(partitionManager);
-               
when(network.getPartitionConsumableNotifier()).thenReturn(consumableNotifier);
+               
when(network.getResultPartitionManager()).thenReturn(partitionManager);
                
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
                when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
+               JobManagerCommunicationFactory jobManagerCommunicationFactory = 
mock(JobManagerCommunicationFactory.class);
+               
when(jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(any(Task.class))).thenReturn(consumableNotifier);
+
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                                new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
                                new SerializedValue<>(new ExecutionConfig()),
@@ -248,18 +248,19 @@ public class StreamTaskTest {
                                0);
 
                return new Task(
-                               tdd,
-                               mock(MemoryManager.class),
-                               mock(IOManager.class),
-                               network,
-                               mock(BroadcastVariableManager.class),
-                               new DummyGateway(),
-                               new DummyGateway(),
-                               new FiniteDuration(60, TimeUnit.SECONDS),
-                               libCache,
-                               mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
-                               mock(TaskMetricGroup.class));
+                       tdd,
+                       mock(MemoryManager.class),
+                       mock(IOManager.class),
+                       network,
+                       jobManagerCommunicationFactory,
+                       mock(BroadcastVariableManager.class),
+                       new DummyGateway(),
+                       new DummyGateway(),
+                       new FiniteDuration(60, TimeUnit.SECONDS),
+                       libCache,
+                       mock(FileCache.class),
+                       new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
+                       mock(TaskMetricGroup.class));
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 82dbd1f..8915bff 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -233,13 +233,19 @@ public class RescalingITCase extends TestLogger {
 
                        cluster.submitJobDetached(jobGraph);
 
-                       Future<Object> allTasksRunning = jobManager.ask(new 
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), 
deadline.timeLeft());
+                       Object savepointResponse = null;
 
-                       Await.ready(allTasksRunning, deadline.timeLeft());
+                       // we might be too early for taking a savepoint if the 
operators have not been started yet
+                       while (deadline.hasTimeLeft()) {
 
-                       Future<Object> savepointPathFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft());
+                               Future<Object> savepointPathFuture = 
jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), 
deadline.timeLeft());
+
+                               savepointResponse = 
Await.result(savepointPathFuture, deadline.timeLeft());
 
-                       Object savepointResponse = 
Await.result(savepointPathFuture, deadline.timeLeft());
+                               if (savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess) {
+                                       break;
+                               }
+                       }
 
                        assertTrue(savepointResponse instanceof 
JobManagerMessages.TriggerSavepointSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index 8b39f52..107801d 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -18,17 +18,13 @@
 
 package org.apache.flink.yarn
 
-import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.instance.InstanceConnectionInfo
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.taskmanager.{TaskManagerConfiguration, 
TaskManager}
-import org.apache.flink.runtime.util.ProcessShutDownThread
-
-import scala.concurrent.duration._
+import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration}
 
 /** An extension of the TaskManager that listens for additional YARN related
   * messages.

Reply via email to