[FLINK-4455] [FLINK-4424] [networkenv] Make NetworkEnvironment independent of 
ActorGateway and JobManager association

Makes the NetworkEnvironment independent of the JobManager association. This 
means that the
NetworkEnvironment and with it the ConnectionManager is started before the 
TaskManager actor
is executed. Furthermore, the ConnectionManager keeps running even in case of a 
JobManager
disassocation. In the wake of the remodelling this behaviour, the 
PartitionStateChecker and
the ResultPartitionConsumableNotifier which depend on the JobManager 
association were moved
out of the NetworkEnvironment. They are now contained in the SlotEnvironment 
which will be
set up when the TaskManager connects to a JobManager. The SlotEnvironment 
contains all
information related to the associated JobManager. Since all slots are 
implicitly associated
with the JobManager which is the leader, we only create one SlotEnvironment 
which is shared
by all Tasks.

Introduce SlotEnvironment to accommodate the PartitionStateChecker and 
ResultPartitionConsumableNotifier

Remove the PartitionStateChecker and the ResultPartitionConsumableNotifier from 
the
NetworkEnvironment. Start the NetworkEnvironment when the TaskManager 
components are
created. Keep the NetworkEnvironment running also when the JobManager is 
disassociated.

Fix CassandraConnectorITCase

Remove ExecutionContext from TaskManager; Rename SlotEnvironment into 
JobManagerConnection

Introduce JobManagerCommunicationFactory to generate job manager specific 
communication components

This closes #2449.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78f2a158
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78f2a158
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78f2a158

Branch: refs/heads/master
Commit: 78f2a15867734055a9712ea3a27f54d9bed3e43b
Parents: bc9d523
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Aug 31 09:33:46 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri Sep 2 15:05:21 2016 +0200

----------------------------------------------------------------------
 .../instance/InstanceConnectionInfo.java        |   7 +-
 .../flink/runtime/io/network/ConnectionID.java  |   2 +
 .../runtime/io/network/ConnectionManager.java   |   2 +
 .../io/network/LocalConnectionManager.java      |   5 +
 .../runtime/io/network/NetworkEnvironment.java  | 514 ++++---------------
 .../runtime/io/network/netty/NettyConfig.java   |   2 +-
 .../network/netty/NettyConnectionManager.java   |   9 +
 .../runtime/io/network/netty/NettyServer.java   |  10 +
 .../io/network/partition/ResultPartition.java   |  25 +-
 .../partition/ResultPartitionManager.java       |   2 +
 .../partition/consumer/InputChannel.java        |   8 +-
 .../partition/consumer/LocalInputChannel.java   |   8 +-
 .../partition/consumer/RemoteInputChannel.java  |   8 +-
 .../partition/consumer/SingleInputGate.java     |  32 +-
 .../partition/consumer/UnknownInputChannel.java |  17 +-
 .../flink/runtime/query/KvStateRegistry.java    |   7 +
 ...orGatewayJobManagerCommunicationFactory.java |  61 +++
 .../ActorGatewayKvStateRegistryListener.java    |  82 +++
 .../ActorGatewayPartitionStateChecker.java      |  59 +++
 ...atewayResultPartitionConsumableNotifier.java |  82 +++
 .../JobManagerCommunicationFactory.java         |  47 ++
 .../apache/flink/runtime/taskmanager/Task.java  |  28 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +
 .../NetworkEnvironmentConfiguration.scala       |   3 +-
 .../flink/runtime/taskmanager/TaskManager.scala | 152 ++++--
 .../testingUtils/TestingTaskManagerLike.scala   |   2 +-
 .../io/network/NetworkEnvironmentTest.java      | 189 ++-----
 .../partition/consumer/InputChannelTest.java    |  15 +-
 .../consumer/LocalInputChannelTest.java         |  15 +-
 .../consumer/RemoteInputChannelTest.java        |  15 +-
 .../partition/consumer/SingleInputGateTest.java |  35 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  29 +-
 ...askManagerComponentsStartupShutdownTest.java |  46 +-
 .../TaskManagerRegistrationTest.java            |  69 ---
 .../taskmanager/TaskManagerStartupTest.java     |  42 +-
 .../runtime/taskmanager/TaskManagerTest.java    |   3 -
 .../flink/runtime/taskmanager/TaskStopTest.java |  18 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  25 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  30 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  35 +-
 .../test/checkpointing/RescalingITCase.java     |  14 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   6 +-
 42 files changed, 932 insertions(+), 834 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
index eb87292..2830f04 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/InstanceConnectionInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.instance;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
@@ -36,7 +37,7 @@ import org.slf4j.LoggerFactory;
  * for data exchange. This class also contains utilities to work with the
  * TaskManager's host name, which is used to localize work assignments.
  */
-public class InstanceConnectionInfo implements IOReadableWritable, 
Comparable<InstanceConnectionInfo>, java.io.Serializable {
+public class InstanceConnectionInfo implements IOReadableWritable, 
Comparable<InstanceConnectionInfo>, Serializable {
 
        private static final long serialVersionUID = -8254407801276350716L;
        
@@ -77,7 +78,9 @@ public class InstanceConnectionInfo implements 
IOReadableWritable, Comparable<In
                if (inetAddress == null) {
                        throw new IllegalArgumentException("Argument 
inetAddress must not be null");
                }
-               if (dataPort <= 0) {
+
+               // -1 indicates a local instance connection info
+               if (dataPort != -1 && dataPort <= 0) {
                        throw new IllegalArgumentException("Argument dataPort 
must be greater than zero");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index c15e72e..0569dae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -37,6 +37,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ConnectionID implements Serializable {
 
+       private static final long serialVersionUID = -8068626194818666857L;
+
        private final InetSocketAddress address;
 
        private final int connectionIndex;

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index 2f535fe..02deb9d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -46,6 +46,8 @@ public interface ConnectionManager {
 
        int getNumberOfActiveConnections();
 
+       int getDataPort();
+
        void shutdown() throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 410f8ab..4f51a56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -48,5 +48,10 @@ public class LocalConnectionManager implements 
ConnectionManager {
        }
 
        @Override
+       public int getDataPort() {
+               return -1;
+       }
+
+       @Override
        public void shutdown() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 844bc2d..b221ec7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -18,60 +18,34 @@
 
 package org.apache.flink.runtime.io.network;
 
-import akka.dispatch.OnFailure;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.NettyConfig;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
-import org.apache.flink.runtime.messages.TaskMessages.FailTask;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateMessage;
 import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateRegistryListener;
-import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
-import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Network I/O components of each {@link TaskManager} instance. The network 
environment contains
  * the data structures that keep track of all intermediate results and all 
data exchanges.
- *
- * When initialized, the NetworkEnvironment will allocate the network buffer 
pool.
- * All other components (netty, intermediate result managers, ...) are only 
created once the
- * environment is "associated" with a TaskManager and JobManager. This happens 
as soon as the
- * TaskManager actor gets created and registers itself at the JobManager.
  */
 public class NetworkEnvironment {
 
@@ -79,69 +53,61 @@ public class NetworkEnvironment {
 
        private final Object lock = new Object();
 
-       private final NetworkEnvironmentConfiguration configuration;
-
-       private final FiniteDuration jobManagerTimeout;
-
        private final NetworkBufferPool networkBufferPool;
 
-       private ConnectionManager connectionManager;
+       private final ConnectionManager connectionManager;
 
-       private ResultPartitionManager partitionManager;
+       private final ResultPartitionManager resultPartitionManager;
 
-       private TaskEventDispatcher taskEventDispatcher;
-
-       private ResultPartitionConsumableNotifier partitionConsumableNotifier;
-
-       private PartitionStateChecker partitionStateChecker;
+       private final TaskEventDispatcher taskEventDispatcher;
 
        /** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
-       private KvStateServer kvStateServer;
+       private final KvStateServer kvStateServer;
 
        /** Registry for {@link org.apache.flink.runtime.state.KvState} 
instances. */
-       private KvStateRegistry kvStateRegistry;
+       private final KvStateRegistry kvStateRegistry;
 
-       private boolean isShutdown;
+       private final IOManager.IOMode defaultIOMode;
 
-       /**
-        * ExecutionEnvironment which is used to execute remote calls with the
-        * {@link JobManagerResultPartitionConsumableNotifier}
-        */
-       private final ExecutionContext executionContext;
+       private final int partitionRequestInitialBackoff;
 
-       private final InstanceConnectionInfo connectionInfo;
+       private final int partitionRequestMaxBackoff;
+
+       private boolean isShutdown;
 
-       /**
-        * Initializes all network I/O components.
-        */
        public NetworkEnvironment(
-                       ExecutionContext executionContext,
-                       FiniteDuration jobManagerTimeout,
-                       NetworkEnvironmentConfiguration config,
-                       InstanceConnectionInfo connectionInfo) throws 
IOException {
-
-               this.executionContext = executionContext;
-               this.configuration = checkNotNull(config);
-               this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
-               this.connectionInfo = checkNotNull(connectionInfo);
-
-               // create the network buffers - this is the operation most 
likely to fail upon
-               // mis-configuration, so we do this first
-               try {
-                       networkBufferPool = new 
NetworkBufferPool(config.numNetworkBuffers(),
-                                       config.networkBufferSize(), 
config.memoryType());
-               }
-               catch (Throwable t) {
-                       throw new IOException("Cannot allocate network buffer 
pool: " + t.getMessage(), t);
-               }
+               NetworkBufferPool networkBufferPool,
+               ConnectionManager connectionManager,
+               ResultPartitionManager resultPartitionManager,
+               TaskEventDispatcher taskEventDispatcher,
+               KvStateRegistry kvStateRegistry,
+               KvStateServer kvStateServer,
+               IOMode defaultIOMode,
+               int partitionRequestInitialBackoff,
+               int partitionRequestMaxBackoff) {
+
+               this.networkBufferPool = checkNotNull(networkBufferPool);
+               this.connectionManager = checkNotNull(connectionManager);
+               this.resultPartitionManager = 
checkNotNull(resultPartitionManager);
+               this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
+               this.kvStateRegistry = checkNotNull(kvStateRegistry);
+
+               this.kvStateServer = kvStateServer;
+
+               this.defaultIOMode = defaultIOMode;
+
+               this.partitionRequestInitialBackoff = 
partitionRequestInitialBackoff;
+               this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
+
+               isShutdown = false;
        }
 
        // 
--------------------------------------------------------------------------------------------
        //  Properties
        // 
--------------------------------------------------------------------------------------------
 
-       public ResultPartitionManager getPartitionManager() {
-               return partitionManager;
+       public ResultPartitionManager getResultPartitionManager() {
+               return resultPartitionManager;
        }
 
        public TaskEventDispatcher getTaskEventDispatcher() {
@@ -157,187 +123,27 @@ public class NetworkEnvironment {
        }
 
        public IOMode getDefaultIOMode() {
-               return configuration.ioMode();
+               return defaultIOMode;
        }
 
-       public ResultPartitionConsumableNotifier 
getPartitionConsumableNotifier() {
-               return partitionConsumableNotifier;
+       public int getPartitionRequestInitialBackoff() {
+               return partitionRequestInitialBackoff;
        }
 
-       public PartitionStateChecker getPartitionStateChecker() {
-               return partitionStateChecker;
+       public int getPartitionRequestMaxBackoff() {
+               return partitionRequestMaxBackoff;
        }
 
-       public Tuple2<Integer, Integer> 
getPartitionRequestInitialAndMaxBackoff() {
-               return configuration.partitionRequestInitialAndMaxBackoff();
+       public KvStateRegistry getKvStateRegistry() {
+               return kvStateRegistry;
        }
 
-       public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, 
JobVertexID jobVertexId) {
-               return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+       public KvStateServer getKvStateServer() {
+               return kvStateServer;
        }
 
-       // 
--------------------------------------------------------------------------------------------
-       //  Association / Disassociation with JobManager / TaskManager
-       // 
--------------------------------------------------------------------------------------------
-
-       public boolean isAssociated() {
-               return partitionConsumableNotifier != null;
-       }
-
-       /**
-        * This associates the network environment with a TaskManager and 
JobManager.
-        * This will actually start the network components.
-        *
-        * @param jobManagerGateway Gateway to the JobManager.
-        * @param taskManagerGateway Gateway to the TaskManager.
-        *
-        * @throws IOException Thrown if the network subsystem (Netty) cannot 
be properly started.
-        */
-       public void associateWithTaskManagerAndJobManager(
-                       ActorGateway jobManagerGateway,
-                       ActorGateway taskManagerGateway) throws IOException
-       {
-               checkNotNull(jobManagerGateway);
-               checkNotNull(taskManagerGateway);
-
-               synchronized (lock) {
-                       if (isShutdown) {
-                               throw new IllegalStateException("environment is 
shut down");
-                       }
-
-                       if (this.partitionConsumableNotifier == null &&
-                               this.partitionManager == null &&
-                               this.taskEventDispatcher == null &&
-                               this.connectionManager == null &&
-                               this.kvStateRegistry == null &&
-                               this.kvStateServer == null)
-                       {
-                               // good, not currently associated. start the 
individual components
-
-                               LOG.debug("Starting result partition manager 
and network connection manager");
-                               this.partitionManager = new 
ResultPartitionManager();
-                               this.taskEventDispatcher = new 
TaskEventDispatcher();
-                               this.partitionConsumableNotifier = new 
JobManagerResultPartitionConsumableNotifier(
-                                       executionContext,
-                                       jobManagerGateway,
-                                       taskManagerGateway,
-                                       jobManagerTimeout);
-
-                               this.partitionStateChecker = new 
JobManagerPartitionStateChecker(
-                                               jobManagerGateway, 
taskManagerGateway);
-
-                               // -----  Network connections  -----
-                               final Option<NettyConfig> nettyConfig = 
configuration.nettyConfig();
-                               connectionManager = nettyConfig.isDefined() ? 
new NettyConnectionManager(nettyConfig.get())
-                                                                               
                                        : new LocalConnectionManager();
-
-                               try {
-                                       LOG.debug("Starting network connection 
manager");
-                                       
connectionManager.start(partitionManager, taskEventDispatcher, 
networkBufferPool);
-                               }
-                               catch (Throwable t) {
-                                       throw new IOException("Failed to 
instantiate network connection manager: " + t.getMessage(), t);
-                               }
-
-                               try {
-                                       kvStateRegistry = new KvStateRegistry();
-
-                                       if (nettyConfig.isDefined()) {
-                                               int numNetworkThreads = 
configuration.queryServerNetworkThreads();
-                                               if (numNetworkThreads == 0) {
-                                                       numNetworkThreads = 
nettyConfig.get().getNumberOfSlots();
-                                               }
-
-                                               int numQueryThreads = 
configuration.queryServerNetworkThreads();
-                                               if (numQueryThreads == 0) {
-                                                       numQueryThreads = 
nettyConfig.get().getNumberOfSlots();
-                                               }
-
-                                               kvStateServer = new 
KvStateServer(
-                                                               
connectionInfo.address(),
-                                                               
configuration.queryServerPort(),
-                                                               
numNetworkThreads,
-                                                               numQueryThreads,
-                                                               kvStateRegistry,
-                                                               new 
DisabledKvStateRequestStats());
-
-                                               kvStateServer.start();
-
-                                               KvStateRegistryListener 
listener = new JobManagerKvStateRegistryListener(
-                                                               
jobManagerGateway,
-                                                               
kvStateServer.getAddress());
-
-                                               
kvStateRegistry.registerListener(listener);
-                                       }
-                               } catch (Throwable t) {
-                                       throw new IOException("Failed to 
instantiate KvState management components: "
-                                                       + t.getMessage(), t);
-                               }
-                       }
-                       else {
-                               throw new IllegalStateException(
-                                               "Network Environment is already 
associated with a JobManager/TaskManager");
-                       }
-               }
-       }
-
-       public void disassociate() throws IOException {
-               synchronized (lock) {
-                       if (!isAssociated()) {
-                               return;
-                       }
-
-                       LOG.debug("Disassociating NetworkEnvironment from 
TaskManager. Cleaning all intermediate results.");
-
-                       // Shut down KvStateRegistry
-                       kvStateRegistry = null;
-
-                       // Shut down KvStateServer
-                       if (kvStateServer != null) {
-                               try {
-                                       kvStateServer.shutDown();
-                               } catch (Throwable t) {
-                                       throw new IOException("Cannot shutdown 
KvStateNettyServer", t);
-                               }
-                               kvStateServer = null;
-                       }
-
-                       // terminate all network connections
-                       if (connectionManager != null) {
-                               try {
-                                       LOG.debug("Shutting down network 
connection manager");
-                                       connectionManager.shutdown();
-                                       connectionManager = null;
-                               }
-                               catch (Throwable t) {
-                                       throw new IOException("Cannot shutdown 
network connection manager", t);
-                               }
-                       }
-
-                       // shutdown all intermediate results
-                       if (partitionManager != null) {
-                               try {
-                                       LOG.debug("Shutting down intermediate 
result partition manager");
-                                       partitionManager.shutdown();
-                                       partitionManager = null;
-                               }
-                               catch (Throwable t) {
-                                       throw new IOException("Cannot shutdown 
partition manager", t);
-                               }
-                       }
-
-                       partitionConsumableNotifier = null;
-
-                       partitionStateChecker = null;
-
-                       if (taskEventDispatcher != null) {
-                               taskEventDispatcher.clearAll();
-                               taskEventDispatcher = null;
-                       }
-
-                       // make sure that the global buffer pool re-acquires 
all buffers
-                       networkBufferPool.destroyAllBufferPools();
-               }
+       public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, 
JobVertexID jobVertexId) {
+               return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -358,9 +164,6 @@ public class NetworkEnvironment {
                        if (isShutdown) {
                                throw new 
IllegalStateException("NetworkEnvironment is shut down");
                        }
-                       if (!isAssociated()) {
-                               throw new 
IllegalStateException("NetworkEnvironment is not associated with a 
TaskManager");
-                       }
 
                        for (int i = 0; i < producedPartitions.length; i++) {
                                final ResultPartition partition = 
producedPartitions[i];
@@ -373,17 +176,15 @@ public class NetworkEnvironment {
                                        bufferPool = 
networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), false);
                                        
partition.registerBufferPool(bufferPool);
 
-                                       
partitionManager.registerResultPartition(partition);
-                               }
-                               catch (Throwable t) {
+                                       
resultPartitionManager.registerResultPartition(partition);
+                               } catch (Throwable t) {
                                        if (bufferPool != null) {
                                                bufferPool.lazyDestroy();
                                        }
 
                                        if (t instanceof IOException) {
                                                throw (IOException) t;
-                                       }
-                                       else {
+                                       } else {
                                                throw new 
IOException(t.getMessage(), t);
                                        }
                                }
@@ -401,31 +202,18 @@ public class NetworkEnvironment {
                                try {
                                        bufferPool = 
networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), false);
                                        gate.setBufferPool(bufferPool);
-                               }
-                               catch (Throwable t) {
+                               } catch (Throwable t) {
                                        if (bufferPool != null) {
                                                bufferPool.lazyDestroy();
                                        }
 
                                        if (t instanceof IOException) {
                                                throw (IOException) t;
-                                       }
-                                       else {
+                                       } else {
                                                throw new 
IOException(t.getMessage(), t);
                                        }
                                }
                        }
-
-                       // Copy the reference to prevent races with concurrent 
shut downs
-                       jobManagerNotifier = partitionConsumableNotifier;
-               }
-
-               for (ResultPartition partition : producedPartitions) {
-                       // Eagerly notify consumers if required.
-                       if (partition.getEagerlyDeployConsumers()) {
-                               jobManagerNotifier.notifyPartitionConsumable(
-                                               partition.getJobId(), 
partition.getPartitionId());
-                       }
                }
        }
 
@@ -436,13 +224,13 @@ public class NetworkEnvironment {
                final ExecutionAttemptID executionId = task.getExecutionId();
 
                synchronized (lock) {
-                       if (isShutdown || !isAssociated()) {
+                       if (isShutdown) {
                                // no need to do anything when we are not 
operational
                                return;
                        }
 
                        if (task.isCanceledOrFailed()) {
-                               
partitionManager.releasePartitionsProducedBy(executionId, 
task.getFailureCause());
+                               
resultPartitionManager.releasePartitionsProducedBy(executionId, 
task.getFailureCause());
                        }
 
                        ResultPartitionWriter[] writers = task.getAllWriters();
@@ -476,6 +264,31 @@ public class NetworkEnvironment {
                }
        }
 
+       public void start() throws IOException {
+               synchronized (lock) {
+                       Preconditions.checkState(!isShutdown, "The 
NetworkEnvironment has already been shut down.");
+
+                       LOG.info("Starting the network environment and its 
components.");
+
+                       try {
+                               LOG.debug("Starting network connection 
manager");
+                               connectionManager.start(resultPartitionManager, 
taskEventDispatcher, networkBufferPool);
+                       }
+                       catch (IOException t) {
+                               throw new IOException("Failed to instantiate 
network connection manager.", t);
+                       }
+
+                       if (kvStateServer != null) {
+                               try {
+                                       LOG.debug("Starting the KvState 
server.");
+                                       kvStateServer.start();
+                               } catch (InterruptedException ie) {
+                                       throw new IOException("Failed to start 
the KvState server.", ie);
+                               }
+                       }
+               }
+       }
+
        /**
         * Tries to shut down all network I/O components.
         */
@@ -485,20 +298,45 @@ public class NetworkEnvironment {
                                return;
                        }
 
-                       // shut down all connections and free all intermediate 
result partitions
+                       LOG.info("Shutting down the network environment and its 
components.");
+
+                       if (kvStateServer != null) {
+                               try {
+                                       kvStateServer.shutDown();
+                               } catch (Throwable t) {
+                                       LOG.warn("Cannot shut down KvState 
server.", t);
+                               }
+                       }
+
+                       // terminate all network connections
                        try {
-                               disassociate();
+                               LOG.debug("Shutting down network connection 
manager");
+                               connectionManager.shutdown();
                        }
                        catch (Throwable t) {
-                               LOG.warn("Network services did not shut down 
properly: " + t.getMessage(), t);
+                               LOG.warn("Cannot shut down the network 
connection manager.", t);
                        }
 
+                       // shutdown all intermediate results
+                       try {
+                               LOG.debug("Shutting down intermediate result 
partition manager");
+                               resultPartitionManager.shutdown();
+                       }
+                       catch (Throwable t) {
+                               LOG.warn("Cannot shut down the result partition 
manager.", t);
+                       }
+
+                       taskEventDispatcher.clearAll();
+
+                       // make sure that the global buffer pool re-acquires 
all buffers
+                       networkBufferPool.destroyAllBufferPools();
+
                        // destroy the buffer pool
                        try {
                                networkBufferPool.destroy();
                        }
                        catch (Throwable t) {
-                               LOG.warn("Network buffer pool did not shut down 
properly: " + t.getMessage(), t);
+                               LOG.warn("Network buffer pool did not shut down 
properly.", t);
                        }
 
                        isShutdown = true;
@@ -506,138 +344,8 @@ public class NetworkEnvironment {
        }
 
        public boolean isShutdown() {
-               return isShutdown;
-       }
-
-       /**
-        * Notifies the job manager about consumable partitions.
-        */
-       private static class JobManagerResultPartitionConsumableNotifier 
implements ResultPartitionConsumableNotifier {
-
-               /**
-                * {@link ExecutionContext} which is used for the failure 
handler of {@link ScheduleOrUpdateConsumers}
-                * messages.
-                */
-               private final ExecutionContext executionContext;
-
-               private final ActorGateway jobManager;
-
-               private final ActorGateway taskManager;
-
-               private final FiniteDuration jobManagerMessageTimeout;
-
-               public JobManagerResultPartitionConsumableNotifier(
-                       ExecutionContext executionContext,
-                       ActorGateway jobManager,
-                       ActorGateway taskManager,
-                       FiniteDuration jobManagerMessageTimeout) {
-
-                       this.executionContext = executionContext;
-                       this.jobManager = jobManager;
-                       this.taskManager = taskManager;
-                       this.jobManagerMessageTimeout = 
jobManagerMessageTimeout;
-               }
-
-               @Override
-               public void notifyPartitionConsumable(JobID jobId, final 
ResultPartitionID partitionId) {
-
-                       final ScheduleOrUpdateConsumers msg = new 
ScheduleOrUpdateConsumers(jobId, partitionId);
-
-                       Future<Object> futureResponse = jobManager.ask(msg, 
jobManagerMessageTimeout);
-
-                       futureResponse.onFailure(new OnFailure() {
-                               @Override
-                               public void onFailure(Throwable failure) {
-                                       LOG.error("Could not schedule or update 
consumers at the JobManager.", failure);
-
-                                       // Fail task at the TaskManager
-                                       FailTask failMsg = new FailTask(
-                                                       
partitionId.getProducerId(),
-                                                       new 
RuntimeException("Could not notify JobManager to schedule or update consumers",
-                                                                       
failure));
-
-                                       taskManager.tell(failMsg);
-                               }
-                       }, executionContext);
-               }
-       }
-
-       private static class JobManagerPartitionStateChecker implements 
PartitionStateChecker {
-
-               private final ActorGateway jobManager;
-
-               private final ActorGateway taskManager;
-
-               public JobManagerPartitionStateChecker(ActorGateway jobManager, 
ActorGateway taskManager) {
-                       this.jobManager = jobManager;
-                       this.taskManager = taskManager;
-               }
-
-               @Override
-               public void triggerPartitionStateCheck(
-                               JobID jobId,
-                               ExecutionAttemptID executionAttemptID,
-                               IntermediateDataSetID resultId,
-                               ResultPartitionID partitionId) {
-
-                       RequestPartitionState msg = new RequestPartitionState(
-                                       jobId, partitionId, executionAttemptID, 
resultId);
-
-                       jobManager.tell(msg, taskManager);
-               }
-       }
-
-       /**
-        * Simple {@link KvStateRegistry} listener, which forwards 
registrations to
-        * the JobManager.
-        */
-       private static class JobManagerKvStateRegistryListener implements 
KvStateRegistryListener {
-
-               private ActorGateway jobManager;
-
-               private KvStateServerAddress kvStateServerAddress;
-
-               public JobManagerKvStateRegistryListener(
-                               ActorGateway jobManager,
-                               KvStateServerAddress kvStateServerAddress) {
-
-                       this.jobManager = 
Preconditions.checkNotNull(jobManager, "JobManager");
-                       this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
-               }
-
-               @Override
-               public void notifyKvStateRegistered(
-                               JobID jobId,
-                               JobVertexID jobVertexId,
-                               int keyGroupIndex,
-                               String registrationName,
-                               KvStateID kvStateId) {
-
-                       Object msg = new KvStateMessage.NotifyKvStateRegistered(
-                                       jobId,
-                                       jobVertexId,
-                                       keyGroupIndex,
-                                       registrationName,
-                                       kvStateId,
-                                       kvStateServerAddress);
-
-                       jobManager.tell(msg);
-               }
-
-               @Override
-               public void notifyKvStateUnregistered(
-                               JobID jobId,
-                               JobVertexID jobVertexId,
-                               int keyGroupIndex,
-                               String registrationName) {
-
-                       Object msg = new 
KvStateMessage.NotifyKvStateUnregistered(
-                                       jobId,
-                                       jobVertexId,
-                                       keyGroupIndex,
-                                       registrationName);
-
-                       jobManager.tell(msg);
+               synchronized (lock) {
+                       return isShutdown;
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
index 6806136..c178f2e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java
@@ -76,7 +76,7 @@ public class NettyConfig {
 
                this.serverAddress = checkNotNull(serverAddress);
 
-               checkArgument(serverPort > 0 && serverPort <= 65536, "Invalid 
port number.");
+               checkArgument(serverPort >= 0 && serverPort <= 65536, "Invalid 
port number.");
                this.serverPort = serverPort;
 
                checkArgument(memorySegmentSize > 0, "Invalid memory segment 
size.");

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index d278b3c..abee2a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -71,6 +71,15 @@ public class NettyConnectionManager implements 
ConnectionManager {
        }
 
        @Override
+       public int getDataPort() {
+               if (server != null && server.getLocalAddress() != null) {
+                       return server.getLocalAddress().getPort();
+               } else {
+                       return -1;
+               }
+       }
+
+       @Override
        public void shutdown() {
                client.shutdown();
                server.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index 036fe22..a93e90c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.ThreadFactory;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -50,8 +51,11 @@ class NettyServer {
 
        private ChannelFuture bindFuture;
 
+       private InetSocketAddress localAddress;
+
        NettyServer(NettyConfig config) {
                this.config = checkNotNull(config);
+               localAddress = null;
        }
 
        void init(final NettyProtocol protocol, NettyBufferPool 
nettyBufferPool) throws IOException {
@@ -128,6 +132,8 @@ class NettyServer {
 
                bindFuture = bootstrap.bind().syncUninterruptibly();
 
+               localAddress = (InetSocketAddress) 
bindFuture.channel().localAddress();
+
                long end = System.currentTimeMillis();
                LOG.info("Successful initialization (took {} ms). Listening on 
SocketAddress {}.", (end - start), 
bindFuture.channel().localAddress().toString());
        }
@@ -140,6 +146,10 @@ class NettyServer {
                return bootstrap;
        }
 
+       public InetSocketAddress getLocalAddress() {
+               return localAddress;
+       }
+
        void shutdown() {
                long start = System.currentTimeMillis();
                if (bindFuture != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 7c109f3..7bcdd31 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -92,7 +92,7 @@ public class ResultPartition implements BufferPoolOwner {
         * <p>If <code>true</code>, the consumers are deployed as soon as the
         * runtime result is registered at the result manager of the task 
manager.
         */
-       private final boolean eagerlyDeployConsumers;
+       private final boolean doEagerDeployment;
 
        /** The subpartitions of this partition. At least one. */
        private final ResultSubpartition[] subpartitions;
@@ -133,7 +133,7 @@ public class ResultPartition implements BufferPoolOwner {
                        JobID jobId,
                        ResultPartitionID partitionId,
                        ResultPartitionType partitionType,
-                       boolean eagerlyDeployConsumers,
+                       boolean doEagerDeployment,
                        int numberOfSubpartitions,
                        ResultPartitionManager partitionManager,
                        ResultPartitionConsumableNotifier 
partitionConsumableNotifier,
@@ -144,7 +144,7 @@ public class ResultPartition implements BufferPoolOwner {
                this.jobId = checkNotNull(jobId);
                this.partitionId = checkNotNull(partitionId);
                this.partitionType = checkNotNull(partitionType);
-               this.eagerlyDeployConsumers = eagerlyDeployConsumers;
+               this.doEagerDeployment = doEagerDeployment;
                this.subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
                this.partitionManager = checkNotNull(partitionManager);
                this.partitionConsumableNotifier = 
checkNotNull(partitionConsumableNotifier);
@@ -211,16 +211,6 @@ public class ResultPartition implements BufferPoolOwner {
                return subpartitions.length;
        }
 
-       /**
-        * Returns whether consumers should be deployed eagerly (as soon as they
-        * are registered at the result manager of the task manager).
-        *
-        * @return Whether consumers should be deployed eagerly
-        */
-       public boolean getEagerlyDeployConsumers() {
-               return eagerlyDeployConsumers;
-       }
-
        public BufferProvider getBufferProvider() {
                return bufferPool;
        }
@@ -357,6 +347,15 @@ public class ResultPartition implements BufferPoolOwner {
        }
 
        /**
+        * Deploys consumers if eager deployment is activated
+        */
+       public void deployConsumers() {
+               if (doEagerDeployment) {
+                       
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
+               }
+       }
+
+       /**
         * Releases buffers held by this result partition.
         *
         * <p> This is a callback from the buffer pool, which is registered for 
result partitions, which

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
index 9da3e14..6edae6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
@@ -58,6 +58,8 @@ public class ResultPartitionManager implements 
ResultPartitionProvider {
                                throw new IllegalStateException("Result 
partition already registered.");
                        }
 
+                       partition.deployConsumers();
+
                        LOG.debug("Registered {}.", partition);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
index 5d82903..35094e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,13 +70,14 @@ public abstract class InputChannel {
                        SingleInputGate inputGate,
                        int channelIndex,
                        ResultPartitionID partitionId,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff,
+                       int initialBackoff,
+                       int maxBackoff,
                        Counter numBytesIn) {
 
                checkArgument(channelIndex >= 0);
 
-               int initial = initialAndMaxBackoff._1();
-               int max = initialAndMaxBackoff._2();
+               int initial = initialBackoff;
+               int max = maxBackoff;
 
                checkArgument(initial >= 0 && initial <= max);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 6fcd2f9..a8aae7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -32,7 +32,6 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.Timer;
@@ -72,7 +71,7 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
                        IOMetricGroup metrics) {
 
                this(inputGate, channelIndex, partitionId, partitionManager, 
taskEventDispatcher,
-                               new Tuple2<Integer, Integer>(0, 0), metrics);
+                               0, 0, metrics);
        }
 
        LocalInputChannel(
@@ -81,10 +80,11 @@ public class LocalInputChannel extends InputChannel 
implements NotificationListe
                        ResultPartitionID partitionId,
                        ResultPartitionManager partitionManager,
                        TaskEventDispatcher taskEventDispatcher,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff,
+                       int initialBackoff,
+                       int maxBackoff,
                        IOMetricGroup metrics) {
 
-               super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, metrics.getNumBytesInLocalCounter());
+               super(inputGate, channelIndex, partitionId, initialBackoff, 
maxBackoff, metrics.getNumBytesInLocalCounter());
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 1cd042c..a12d2a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Tuple2;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -85,7 +84,7 @@ public class RemoteInputChannel extends InputChannel {
                        IOMetricGroup metrics) {
 
                this(inputGate, channelIndex, partitionId, connectionId, 
connectionManager,
-                               new Tuple2<Integer, Integer>(0, 0), metrics);
+                               0, 0, metrics);
        }
 
        public RemoteInputChannel(
@@ -94,10 +93,11 @@ public class RemoteInputChannel extends InputChannel {
                        ResultPartitionID partitionId,
                        ConnectionID connectionId,
                        ConnectionManager connectionManager,
-                       Tuple2<Integer, Integer> initialAndMaxBackoff,
+                       int initialBackoff,
+                       int maxBackoff,
                        IOMetricGroup metrics) {
 
-               super(inputGate, channelIndex, partitionId, 
initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
+               super(inputGate, channelIndex, partitionId, initialBackoff, 
maxBackoff, metrics.getNumBytesInRemoteCounter());
 
                this.connectionId = checkNotNull(connectionId);
                this.connectionManager = checkNotNull(connectionManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 351181a..aaf8887 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -505,6 +505,7 @@ public class SingleInputGate implements InputGate {
                        ExecutionAttemptID executionId,
                        InputGateDeploymentDescriptor igdd,
                        NetworkEnvironment networkEnvironment,
+                       PartitionStateChecker partitionStateChecker,
                        IOMetricGroup metrics) {
 
                final IntermediateDataSetID consumedResultId = 
checkNotNull(igdd.getConsumedResultId());
@@ -516,7 +517,7 @@ public class SingleInputGate implements InputGate {
 
                final SingleInputGate inputGate = new SingleInputGate(
                                owningTaskName, jobId, executionId, 
consumedResultId, consumedSubpartitionIndex,
-                               icdd.length, 
networkEnvironment.getPartitionStateChecker(), metrics);
+                               icdd.length, partitionStateChecker, metrics);
 
                // Create the input channels. There is one input channel for 
each consumed partition.
                final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
@@ -528,27 +529,30 @@ public class SingleInputGate implements InputGate {
 
                        if (partitionLocation.isLocal()) {
                                inputChannels[i] = new 
LocalInputChannel(inputGate, i, partitionId,
-                                               
networkEnvironment.getPartitionManager(),
-                                               
networkEnvironment.getTaskEventDispatcher(),
-                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
-                                               metrics
+                                       
networkEnvironment.getResultPartitionManager(),
+                                       
networkEnvironment.getTaskEventDispatcher(),
+                                       
networkEnvironment.getPartitionRequestInitialBackoff(),
+                                       
networkEnvironment.getPartitionRequestMaxBackoff(),
+                                       metrics
                                );
                        }
                        else if (partitionLocation.isRemote()) {
                                inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
-                                               
partitionLocation.getConnectionId(),
-                                               
networkEnvironment.getConnectionManager(),
-                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
-                                               metrics
+                                       partitionLocation.getConnectionId(),
+                                       
networkEnvironment.getConnectionManager(),
+                                       
networkEnvironment.getPartitionRequestInitialBackoff(),
+                                       
networkEnvironment.getPartitionRequestInitialBackoff(),
+                                       metrics
                                );
                        }
                        else if (partitionLocation.isUnknown()) {
                                inputChannels[i] = new 
UnknownInputChannel(inputGate, i, partitionId,
-                                               
networkEnvironment.getPartitionManager(),
-                                               
networkEnvironment.getTaskEventDispatcher(),
-                                               
networkEnvironment.getConnectionManager(),
-                                               
networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
-                                               metrics
+                                       
networkEnvironment.getResultPartitionManager(),
+                                       
networkEnvironment.getTaskEventDispatcher(),
+                                       
networkEnvironment.getConnectionManager(),
+                                       
networkEnvironment.getPartitionRequestInitialBackoff(),
+                                       
networkEnvironment.getPartitionRequestMaxBackoff(),
+                                       metrics
                                );
                        }
                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index cc91e83..27ecc70 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.io.network.api.reader.BufferReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import scala.Tuple2;
 
 import java.io.IOException;
 
@@ -46,7 +45,9 @@ public class UnknownInputChannel extends InputChannel {
        private final ConnectionManager connectionManager;
 
        /** Initial and maximum backoff (in ms) after failed partition 
requests. */
-       private final Tuple2<Integer, Integer> 
partitionRequestInitialAndMaxBackoff;
+       private final int initialBackoff;
+
+       private final int maxBackoff;
 
        private final IOMetricGroup metrics;
 
@@ -57,16 +58,18 @@ public class UnknownInputChannel extends InputChannel {
                        ResultPartitionManager partitionManager,
                        TaskEventDispatcher taskEventDispatcher,
                        ConnectionManager connectionManager,
-                       Tuple2<Integer, Integer> 
partitionRequestInitialAndMaxBackoff,
+                       int initialBackoff,
+                       int maxBackoff,
                        IOMetricGroup metrics) {
 
-               super(gate, channelIndex, partitionId, 
partitionRequestInitialAndMaxBackoff, null);
+               super(gate, channelIndex, partitionId, initialBackoff, 
maxBackoff, null);
 
                this.partitionManager = checkNotNull(partitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
                this.connectionManager = checkNotNull(connectionManager);
-               this.partitionRequestInitialAndMaxBackoff = 
checkNotNull(partitionRequestInitialAndMaxBackoff);
                this.metrics = checkNotNull(metrics);
+               this.initialBackoff = initialBackoff;
+               this.maxBackoff = maxBackoff;
        }
 
        @Override
@@ -117,10 +120,10 @@ public class UnknownInputChannel extends InputChannel {
        // 
------------------------------------------------------------------------
 
        public RemoteInputChannel toRemoteInputChannel(ConnectionID 
producerAddress) {
-               return new RemoteInputChannel(inputGate, channelIndex, 
partitionId, checkNotNull(producerAddress), connectionManager, 
partitionRequestInitialAndMaxBackoff, metrics);
+               return new RemoteInputChannel(inputGate, channelIndex, 
partitionId, checkNotNull(producerAddress), connectionManager, initialBackoff, 
maxBackoff, metrics);
        }
 
        public LocalInputChannel toLocalInputChannel() {
-               return new LocalInputChannel(inputGate, channelIndex, 
partitionId, partitionManager, taskEventDispatcher, 
partitionRequestInitialAndMaxBackoff, metrics);
+               return new LocalInputChannel(inputGate, channelIndex, 
partitionId, partitionManager, taskEventDispatcher, initialBackoff, maxBackoff, 
metrics);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index 5213fe9..f19c123 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -59,6 +59,13 @@ public class KvStateRegistry {
        }
 
        /**
+        * Unregisters the listener with the registry
+        */
+       public void unregisterListener() {
+               listener.set(null);
+       }
+
+       /**
         * Registers the KvState instance identified by the given 4-tuple of 
JobID,
         * JobVertexID, key group index, and registration name.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
new file mode 100644
index 0000000..4697c79
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayJobManagerCommunicationFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Factory implementation which generates {@link ActorGateway} based job 
manager communication
+ * components.
+ */
+public class ActorGatewayJobManagerCommunicationFactory implements 
JobManagerCommunicationFactory {
+       private final ExecutionContext executionContext;
+       private final ActorGateway jobManagerGateway;
+       private final ActorGateway taskManagerGateway;
+       private final FiniteDuration jobManagerMessageTimeout;
+
+       public ActorGatewayJobManagerCommunicationFactory(
+               ExecutionContext executionContext,
+               ActorGateway jobManagerGateway,
+               ActorGateway taskManagerGateway,
+               FiniteDuration jobManagerMessageTimeout) {
+
+               this.executionContext = 
Preconditions.checkNotNull(executionContext);
+               this.jobManagerGateway = 
Preconditions.checkNotNull(jobManagerGateway);
+               this.taskManagerGateway = 
Preconditions.checkNotNull(taskManagerGateway);
+               this.jobManagerMessageTimeout = 
Preconditions.checkNotNull(jobManagerMessageTimeout);
+       }
+
+       public PartitionStateChecker createPartitionStateChecker() {
+               return new ActorGatewayPartitionStateChecker(jobManagerGateway, 
taskManagerGateway);
+       }
+
+       public ResultPartitionConsumableNotifier 
createResultPartitionConsumableNotifier(Task owningTask) {
+               return new ActorGatewayResultPartitionConsumableNotifier(
+                       executionContext,
+                       jobManagerGateway,
+                       owningTask,
+                       jobManagerMessageTimeout);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
new file mode 100644
index 0000000..2d69938
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.KvStateRegistryListener;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * This implementation uses {@link ActorGateway} to forward key-value state 
notifications to the job
+ * manager. The notifications are wrapped in an actor message and send to the 
given actor gateway.
+ */
+public class ActorGatewayKvStateRegistryListener implements 
KvStateRegistryListener {
+
+       private ActorGateway jobManager;
+
+       private KvStateServerAddress kvStateServerAddress;
+
+       public ActorGatewayKvStateRegistryListener(
+               ActorGateway jobManager,
+               KvStateServerAddress kvStateServerAddress) {
+
+               this.jobManager = Preconditions.checkNotNull(jobManager, 
"JobManager");
+               this.kvStateServerAddress = 
Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress");
+       }
+
+       @Override
+       public void notifyKvStateRegistered(
+               JobID jobId,
+               JobVertexID jobVertexId,
+               int keyGroupIndex,
+               String registrationName,
+               KvStateID kvStateId) {
+
+               Object msg = new KvStateMessage.NotifyKvStateRegistered(
+                       jobId,
+                       jobVertexId,
+                       keyGroupIndex,
+                       registrationName,
+                       kvStateId,
+                       kvStateServerAddress);
+
+               jobManager.tell(msg);
+       }
+
+       @Override
+       public void notifyKvStateUnregistered(
+               JobID jobId,
+               JobVertexID jobVertexId,
+               int keyGroupIndex,
+               String registrationName) {
+
+               Object msg = new KvStateMessage.NotifyKvStateUnregistered(
+                       jobId,
+                       jobVertexId,
+                       keyGroupIndex,
+                       registrationName);
+
+               jobManager.tell(msg);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
new file mode 100644
index 0000000..e7c6690
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+
+/**
+ * This implementation uses {@link ActorGateway} to trigger the partition 
state check at the job
+ * manager.
+ */
+public class ActorGatewayPartitionStateChecker implements 
PartitionStateChecker {
+
+       private final ActorGateway jobManager;
+
+       private final ActorGateway taskManager;
+
+       public ActorGatewayPartitionStateChecker(ActorGateway jobManager, 
ActorGateway taskManager) {
+               this.jobManager = jobManager;
+               this.taskManager = taskManager;
+       }
+
+       @Override
+       public void triggerPartitionStateCheck(
+               JobID jobId,
+               ExecutionAttemptID executionAttemptID,
+               IntermediateDataSetID resultId,
+               ResultPartitionID partitionId) {
+
+               JobManagerMessages.RequestPartitionState msg = new 
JobManagerMessages.RequestPartitionState(
+                       jobId,
+                       partitionId,
+                       executionAttemptID,
+                       resultId);
+
+               jobManager.tell(msg, taskManager);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
new file mode 100644
index 0000000..b91120b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayResultPartitionConsumableNotifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * This implementation uses {@link ActorGateway} to notify the job manager 
about consumable
+ * partitions.
+ */
+public class ActorGatewayResultPartitionConsumableNotifier implements 
ResultPartitionConsumableNotifier {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ActorGatewayResultPartitionConsumableNotifier.class);
+
+       /**
+        * {@link ExecutionContext} which is used for the failure handler of
+        * {@link JobManagerMessages.ScheduleOrUpdateConsumers} messages.
+        */
+       private final ExecutionContext executionContext;
+
+       private final ActorGateway jobManager;
+
+       private final Task owningTask;
+
+       private final FiniteDuration jobManagerMessageTimeout;
+
+       public ActorGatewayResultPartitionConsumableNotifier(
+               ExecutionContext executionContext,
+               ActorGateway jobManager,
+               Task owningTask,
+               FiniteDuration jobManagerMessageTimeout) {
+
+               this.executionContext = 
Preconditions.checkNotNull(executionContext);
+               this.jobManager = Preconditions.checkNotNull(jobManager);
+               this.owningTask = Preconditions.checkNotNull(owningTask);
+               this.jobManagerMessageTimeout = 
Preconditions.checkNotNull(jobManagerMessageTimeout);
+       }
+
+       @Override
+       public void notifyPartitionConsumable(JobID jobId, final 
ResultPartitionID partitionId) {
+
+               final JobManagerMessages.ScheduleOrUpdateConsumers msg = new 
JobManagerMessages.ScheduleOrUpdateConsumers(jobId, partitionId);
+
+               Future<Object> futureResponse = jobManager.ask(msg, 
jobManagerMessageTimeout);
+
+               futureResponse.onFailure(new OnFailure() {
+                       @Override
+                       public void onFailure(Throwable failure) {
+                               LOG.error("Could not schedule or update 
consumers at the JobManager.", failure);
+
+                               owningTask.failExternally(new 
RuntimeException("Could not notify JobManager to schedule or update consumers", 
failure));
+                       }
+               }, executionContext);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
new file mode 100644
index 0000000..64cfcb1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/JobManagerCommunicationFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+
+/**
+ * Factory to generate job manager specific communication components.
+ */
+public interface JobManagerCommunicationFactory {
+
+       /**
+        * Creates a {@link PartitionStateChecker} which communicates with the 
associated job manager of
+        * this instance.
+        *
+        * @return PartitionStateChecker which communicates with the associated 
job manager of this
+        *                      instance
+        */
+       PartitionStateChecker createPartitionStateChecker();
+
+       /**
+        * Creates a {@link ResultPartitionConsumableNotifier} which 
communicates with the associated
+        * job manager of this instance.
+        *
+        * @param owningTask Task which is associated with the 
ResultPartitionConsumableNotifier
+        * @return ResultPartitionConsumableNotifier which communicates with 
the associated job manager
+        *                      of this instance
+        */
+       ResultPartitionConsumableNotifier 
createResultPartitionConsumableNotifier(Task owningTask);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 73601c4..d09e03c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
@@ -91,10 +93,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * with the JobManager.
  *
  * <p>The Flink operators (implemented as subclasses of
- * {@link org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable} have only 
data
- * readers, -writers, and certain event callbacks. The task connects those to 
the
- * network stack and actor messages, and tracks the state of the execution and
- * handles exceptions.
+ * {@link AbstractInvokable} have only data readers, -writers, and certain 
event callbacks.
+ * The task connects those to the network stack and actor messages, and tracks 
the state
+ * of the execution and handles exceptions.
  *
  * <p>Tasks have no knowledge about how they relate to other tasks, or whether 
they
  * are the first attempt to execute the task, or a repeated attempt. All of 
that
@@ -247,6 +248,7 @@ public class Task implements Runnable {
                                MemoryManager memManager,
                                IOManager ioManager,
                                NetworkEnvironment networkEnvironment,
+                               JobManagerCommunicationFactory 
jobManagerCommunicationFactory,
                                BroadcastVariableManager bcVarManager,
                                ActorGateway taskManagerActor,
                                ActorGateway jobManagerActor,
@@ -302,6 +304,9 @@ public class Task implements Runnable {
                this.producedPartitions = new 
ResultPartition[partitions.size()];
                this.writers = new ResultPartitionWriter[partitions.size()];
 
+               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier =
+                       
jobManagerCommunicationFactory.createResultPartitionConsumableNotifier(this);
+
                for (int i = 0; i < this.producedPartitions.length; i++) {
                        ResultPartitionDeploymentDescriptor desc = 
partitions.get(i);
                        ResultPartitionID partitionId = new 
ResultPartitionID(desc.getPartitionId(), executionId);
@@ -313,8 +318,8 @@ public class Task implements Runnable {
                                        desc.getPartitionType(),
                                        desc.getEagerlyDeployConsumers(),
                                        desc.getNumberOfSubpartitions(),
-                                       
networkEnvironment.getPartitionManager(),
-                                       
networkEnvironment.getPartitionConsumableNotifier(),
+                                       
networkEnvironment.getResultPartitionManager(),
+                                       resultPartitionConsumableNotifier,
                                        ioManager,
                                        networkEnvironment.getDefaultIOMode());
 
@@ -325,10 +330,17 @@ public class Task implements Runnable {
                this.inputGates = new 
SingleInputGate[consumedPartitions.size()];
                this.inputGatesById = new HashMap<IntermediateDataSetID, 
SingleInputGate>();
 
+               PartitionStateChecker partitionStateChecker = 
jobManagerCommunicationFactory.createPartitionStateChecker();
+
                for (int i = 0; i < this.inputGates.length; i++) {
                        SingleInputGate gate = SingleInputGate.create(
-                                       taskNameWithSubtaskAndId, jobId, 
executionId, consumedPartitions.get(i), networkEnvironment, 
-                                       metricGroup.getIOMetricGroup());
+                               taskNameWithSubtaskAndId,
+                               jobId,
+                               executionId,
+                               consumedPartitions.get(i),
+                               networkEnvironment,
+                               partitionStateChecker,
+                               metricGroup.getIOMetricGroup());
 
                        this.inputGates[i] = gate;
                        inputGatesById.put(gate.getConsumedResultId(), gate);

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 407fa01..0c62c69 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1472,6 +1472,9 @@ class JobManager(
         currentJobs.get(msg.getJobId) match {
           case Some((graph, _)) =>
             try {
+              log.debug(s"Lookup key-value state for job ${msg.getJobId} with 
registration " +
+                         s"name ${msg.getRegistrationName}.")
+
               val registry = graph.getKvStateLocationRegistry
               val location = 
registry.getKvStateLocation(msg.getRegistrationName)
               if (location == null) {
@@ -1493,6 +1496,9 @@ class JobManager(
         currentJobs.get(msg.getJobId) match {
           case Some((graph, _)) =>
             try {
+              log.debug(s"Key value state registered for job ${msg.getJobId} 
under " +
+                         s"name ${msg.getRegistrationName}.")
+
               graph.getKvStateLocationRegistry.notifyKvStateRegistered(
                 msg.getJobVertexId,
                 msg.getKeyGroupIndex,

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
index 0788d7c..893eaa8 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.scala
@@ -31,4 +31,5 @@ case class NetworkEnvironmentConfiguration(
   queryServerNetworkThreads: Int,
   queryServerQueryThreads: Int,
   nettyConfig: Option[NettyConfig] = None,
-  partitionRequestInitialAndMaxBackoff: (Integer, Integer) = (500, 3000))
+  partitionRequestInitialBackoff: Int = 500,
+  partitinRequestMaxBackoff: Int = 3000)

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 72ec2ac..3154826 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -53,8 +53,10 @@ import org.apache.flink.runtime.filecache.FileCache
 import org.apache.flink.runtime.instance.{AkkaActorGateway, 
HardwareDescription, InstanceConnectionInfo, InstanceID}
 import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
-import org.apache.flink.runtime.io.network.NetworkEnvironment
-import org.apache.flink.runtime.io.network.netty.NettyConfig
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
+import org.apache.flink.runtime.io.network.{LocalConnectionManager, 
NetworkEnvironment, TaskEventDispatcher}
+import org.apache.flink.runtime.io.network.netty.{NettyConfig, 
NettyConnectionManager}
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -67,6 +69,8 @@ import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage,
 import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.process.ProcessReaper
+import org.apache.flink.runtime.query.KvStateRegistry
+import org.apache.flink.runtime.query.netty.{DisabledKvStateRequestStats, 
KvStateServer}
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.util._
@@ -76,7 +80,6 @@ import org.apache.flink.util.{MathUtils, NetUtils}
 import scala.collection.JavaConverters._
 import scala.concurrent._
 import scala.concurrent.duration._
-import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
 import scala.util.{Failure, Success}
 
@@ -193,6 +196,8 @@ class TaskManager(
   private var scheduledTaskManagerRegistration: Option[Cancellable] = None
   private var currentRegistrationRun: UUID = UUID.randomUUID()
 
+  private var jobManagerConnectionFactory: 
Option[JobManagerCommunicationFactory] = None
+
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -395,13 +400,11 @@ class TaskManager(
         // discards intermediate result partitions of a task execution on this 
TaskManager
         case FailIntermediateResultPartitions(executionID) =>
           log.info("Discarding the results produced by task execution " + 
executionID)
-          if (network.isAssociated) {
-            try {
-              
network.getPartitionManager.releasePartitionsProducedBy(executionID)
-            } catch {
-              case t: Throwable => killTaskManagerFatal(
-              "Fatal leak: Unable to release intermediate result partition 
data", t)
-            }
+          try {
+            
network.getResultPartitionManager.releasePartitionsProducedBy(executionID)
+          } catch {
+            case t: Throwable => killTaskManagerFatal(
+            "Fatal leak: Unable to release intermediate result partition 
data", t)
           }
 
         // notifies the TaskManager that the state of a task has changed.
@@ -916,25 +919,33 @@ class TaskManager(
       "starting network stack and library cache.")
 
     // sanity check that the JobManager dependent components are not set up 
currently
-    if (network.isAssociated || blobService.isDefined) {
+    if (jobManagerConnectionFactory.isDefined || blobService.isDefined) {
       throw new IllegalStateException("JobManager-specific components are 
already initialized.")
     }
 
     currentJobManager = Some(jobManager)
     instanceID = id
 
-    // start the network stack, now that we have the JobManager actor reference
-    try {
-      network.associateWithTaskManagerAndJobManager(
-        new AkkaActorGateway(jobManager, leaderSessionID.orNull),
-        new AkkaActorGateway(self, leaderSessionID.orNull)
-      )
-    }
-    catch {
-      case e: Exception =>
-        val message = "Could not start network environment."
-        log.error(message, e)
-        throw new RuntimeException(message, e)
+    val jobManagerGateway = new AkkaActorGateway(jobManager, 
leaderSessionID.orNull)
+    val taskmanagerGateway = new AkkaActorGateway(self, leaderSessionID.orNull)
+
+    jobManagerConnectionFactory = Some(
+      new ActorGatewayJobManagerCommunicationFactory(
+        context.dispatcher,
+        jobManagerGateway,
+        taskmanagerGateway,
+        config.timeout))
+
+
+    val kvStateServer = network.getKvStateServer()
+
+    if (kvStateServer != null) {
+      val kvStateRegistry = network.getKvStateRegistry()
+
+      kvStateRegistry.registerListener(
+        new ActorGatewayKvStateRegistryListener(
+          jobManagerGateway,
+          kvStateServer.getAddress))
     }
 
     // start a blob service, if a blob server is specified
@@ -1031,8 +1042,12 @@ class TaskManager(
     }
     blobService = None
 
-    // disassociate the network environment
-    network.disassociate()
+    // disassociate the slot environment
+    jobManagerConnectionFactory = None
+
+    if (network.getKvStateRegistry != null) {
+      network.getKvStateRegistry.unregisterListener()
+    }
     
     // stop the metrics reporters
     metricsRegistry.shutdown()
@@ -1092,6 +1107,13 @@ class TaskManager(
         case None => throw new IllegalStateException("There is no valid 
library cache manager.")
       }
 
+      val jmFactory = jobManagerConnectionFactory match {
+        case Some(factory) => factory
+        case None =>
+          throw new IllegalStateException("TaskManager is not associated with 
a JobManager and, " +
+                                            "thus, the SlotEnvironment has not 
been initialized.")
+      }
+
       val slot = tdd.getTargetSlotNumber
       if (slot < 0 || slot >= numberOfSlots) {
         throw new IllegalArgumentException(s"Target slot $slot does not exist 
on TaskManager.")
@@ -1117,6 +1139,7 @@ class TaskManager(
         memoryManager,
         ioManager,
         network,
+        jmFactory,
         bcVarManager,
         selfGateway,
         jobManagerGateway,
@@ -1796,7 +1819,7 @@ object TaskManager {
 
     val (taskManagerConfig : TaskManagerConfiguration,      
       netConfig: NetworkEnvironmentConfiguration,
-      connectionInfo: InstanceConnectionInfo,
+      taskManagerAddress: InetSocketAddress,
       memType: MemoryType
     ) = parseTaskManagerConfiguration(
       configuration,
@@ -1806,14 +1829,64 @@ object TaskManager {
     // pre-start checks
     checkTempDirs(taskManagerConfig.tmpDirPaths)
 
-    val executionContext = ExecutionContext.fromExecutor(new ForkJoinPool())
+    val networkBufferPool = new NetworkBufferPool(
+      netConfig.numNetworkBuffers,
+      netConfig.networkBufferSize,
+      netConfig.memoryType)
+
+    val connectionManager = netConfig.nettyConfig match {
+      case Some(nettyConfig) => new NettyConnectionManager(nettyConfig)
+      case None => new LocalConnectionManager()
+    }
+
+    val resultPartitionManager = new ResultPartitionManager()
+    val taskEventDispatcher = new TaskEventDispatcher()
+
+    val kvStateRegistry = new KvStateRegistry()
+
+    val kvStateServer = netConfig.nettyConfig match {
+      case Some(nettyConfig) =>
+
+        val numNetworkThreads = if (netConfig.queryServerNetworkThreads == 0) {
+          nettyConfig.getNumberOfSlots
+        } else {
+          netConfig.queryServerNetworkThreads
+        }
+
+        val numQueryThreads = if (netConfig.queryServerQueryThreads == 0) {
+          nettyConfig.getNumberOfSlots
+        } else {
+          netConfig.queryServerQueryThreads
+        }
+
+        new KvStateServer(
+          taskManagerAddress.getAddress(),
+          netConfig.queryServerPort,
+          numNetworkThreads,
+          numQueryThreads,
+          kvStateRegistry,
+          new DisabledKvStateRequestStats())
+
+      case None => null
+    }
 
     // we start the network first, to make sure it can allocate its buffers 
first
     val network = new NetworkEnvironment(
-      executionContext,
-      taskManagerConfig.timeout,
-      netConfig,
-      connectionInfo)
+      networkBufferPool,
+      connectionManager,
+      resultPartitionManager,
+      taskEventDispatcher,
+      kvStateRegistry,
+      kvStateServer,
+      netConfig.ioMode,
+      netConfig.partitionRequestInitialBackoff,
+      netConfig.partitinRequestMaxBackoff)
+
+    network.start()
+
+    val connectionInfo = new InstanceConnectionInfo(
+      taskManagerAddress.getAddress(),
+      network.getConnectionManager().getDataPort())
 
     // computing the amount of memory to use depends on how much memory is 
available
     // it strictly needs to happen AFTER the network stack has been initialized
@@ -1991,7 +2064,7 @@ object TaskManager {
       localTaskManagerCommunication: Boolean)
     : (TaskManagerConfiguration,
      NetworkEnvironmentConfiguration,
-     InstanceConnectionInfo,
+     InetSocketAddress,
      MemoryType) = {
 
     // ------- read values from the config and check them ---------
@@ -2000,16 +2073,13 @@ object TaskManager {
     // ----> hosts / ports for communication and data exchange
 
     val dataport = 
configuration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
-      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT) match {
-      case 0 => NetUtils.getAvailablePort()
-      case x => x
-    }
+      ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT)
 
-    checkConfigParameter(dataport > 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
+    checkConfigParameter(dataport >= 0, dataport, 
ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
       "Leave config parameter empty or use 0 to let the system choose a port 
automatically.")
 
     val taskManagerAddress = InetAddress.getByName(taskManagerHostname)
-    val connectionInfo = new InstanceConnectionInfo(taskManagerAddress, 
dataport)
+    val taskManagerInetSocketAddress = new 
InetSocketAddress(taskManagerAddress, dataport)
 
     // ----> memory / network stack (shuffles/broadcasts), task slots, temp 
directories
 
@@ -2076,8 +2146,8 @@ object TaskManager {
     } else {
       Some(
         new NettyConfig(
-          connectionInfo.address(),
-          connectionInfo.dataPort(),
+          taskManagerInetSocketAddress.getAddress(),
+          taskManagerInetSocketAddress.getPort(),
           pageSize,
           slots,
           configuration)
@@ -2206,7 +2276,7 @@ object TaskManager {
       maxRegistrationPause,
       refusedRegistrationPause)
 
-    (taskManagerConfig, networkConfig, connectionInfo, memType)
+    (taskManagerConfig, networkConfig, taskManagerInetSocketAddress, memType)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/78f2a158/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
index b41db31..a6963fe 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingTaskManagerLike.scala
@@ -108,7 +108,7 @@ trait TestingTaskManagerLike extends FlinkActor {
       )
 
     case RequestNumActiveConnections =>
-      val numActive = if (network.isAssociated) {
+      val numActive = if (!network.isShutdown) {
         network.getConnectionManager.getNumberOfActiveConnections
       } else {
         0

Reply via email to