This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c4aa3ad  [FLINK-11723][network] Remove KvState related components from 
NetworkEnvironment
c4aa3ad is described below

commit c4aa3ad5f706aedc12f884905a20d96de3977d31
Author: zhijiang <wangzhijiang...@aliyun.com>
AuthorDate: Fri Mar 29 17:04:24 2019 +0800

    [FLINK-11723][network] Remove KvState related components from 
NetworkEnvironment
---
 .../runtime/io/network/NetworkEnvironment.java     |  98 +----------
 .../flink/runtime/taskexecutor/KvStateService.java | 194 +++++++++++++++++++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  15 +-
 .../runtime/taskexecutor/TaskManagerServices.java  |  61 ++-----
 .../org/apache/flink/runtime/taskmanager/Task.java |   8 +-
 .../flink/runtime/taskmanager/TaskManager.scala    |  19 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |   8 +-
 .../taskexecutor/TaskManagerServicesBuilder.java   |   9 +
 .../runtime/taskmanager/TaskAsyncCallTest.java     |   6 +-
 .../apache/flink/runtime/taskmanager/TaskTest.java |  16 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   5 +-
 .../runtime/testingUtils/TestingTaskManager.scala  |   6 +-
 .../StreamNetworkBenchmarkEnvironment.java         |   4 -
 .../tasks/InterruptSensitiveRestoreTest.java       |   7 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java      |   7 +-
 17 files changed, 298 insertions(+), 175 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d3cb897..1c0b6da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,19 +19,12 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.query.KvStateClientProxy;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.ExceptionUtils;
@@ -63,15 +56,6 @@ public class NetworkEnvironment {
 
        private final TaskEventDispatcher taskEventDispatcher;
 
-       /** Server for {@link InternalKvState} requests. */
-       private KvStateServer kvStateServer;
-
-       /** Proxy for the queryable state client. */
-       private KvStateClientProxy kvStateProxy;
-
-       /** Registry for {@link InternalKvState} instances. */
-       private final KvStateRegistry kvStateRegistry;
-
        private final int partitionRequestInitialBackoff;
 
        private final int partitionRequestMaxBackoff;
@@ -99,9 +83,6 @@ public class NetworkEnvironment {
                        new LocalConnectionManager(),
                        new ResultPartitionManager(),
                        new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       null,
                        partitionRequestInitialBackoff,
                        partitionRequestMaxBackoff,
                        networkBuffersPerChannel,
@@ -110,27 +91,20 @@ public class NetworkEnvironment {
        }
 
        public NetworkEnvironment(
-               NetworkBufferPool networkBufferPool,
-               ConnectionManager connectionManager,
-               ResultPartitionManager resultPartitionManager,
-               TaskEventDispatcher taskEventDispatcher,
-               KvStateRegistry kvStateRegistry,
-               KvStateServer kvStateServer,
-               KvStateClientProxy kvStateClientProxy,
-               int partitionRequestInitialBackoff,
-               int partitionRequestMaxBackoff,
-               int networkBuffersPerChannel,
-               int extraNetworkBuffersPerGate,
-               boolean enableCreditBased) {
+                       NetworkBufferPool networkBufferPool,
+                       ConnectionManager connectionManager,
+                       ResultPartitionManager resultPartitionManager,
+                       TaskEventDispatcher taskEventDispatcher,
+                       int partitionRequestInitialBackoff,
+                       int partitionRequestMaxBackoff,
+                       int networkBuffersPerChannel,
+                       int extraNetworkBuffersPerGate,
+                       boolean enableCreditBased) {
 
                this.networkBufferPool = checkNotNull(networkBufferPool);
                this.connectionManager = checkNotNull(connectionManager);
                this.resultPartitionManager = 
checkNotNull(resultPartitionManager);
                this.taskEventDispatcher = checkNotNull(taskEventDispatcher);
-               this.kvStateRegistry = checkNotNull(kvStateRegistry);
-
-               this.kvStateServer = kvStateServer;
-               this.kvStateProxy = kvStateClientProxy;
 
                this.partitionRequestInitialBackoff = 
partitionRequestInitialBackoff;
                this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
@@ -174,22 +148,6 @@ public class NetworkEnvironment {
                return enableCreditBased;
        }
 
-       public KvStateRegistry getKvStateRegistry() {
-               return kvStateRegistry;
-       }
-
-       public KvStateServer getKvStateServer() {
-               return kvStateServer;
-       }
-
-       public KvStateClientProxy getKvStateProxy() {
-               return kvStateProxy;
-       }
-
-       public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, 
JobVertexID jobVertexId) {
-               return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Task operations
        // 
--------------------------------------------------------------------------------------------
@@ -326,26 +284,6 @@ public class NetworkEnvironment {
                        } catch (IOException t) {
                                throw new IOException("Failed to instantiate 
network connection manager.", t);
                        }
-
-                       if (kvStateServer != null) {
-                               try {
-                                       kvStateServer.start();
-                               } catch (Throwable ie) {
-                                       kvStateServer.shutdown();
-                                       kvStateServer = null;
-                                       LOG.error("Failed to start the 
Queryable State Data Server.", ie);
-                               }
-                       }
-
-                       if (kvStateProxy != null) {
-                               try {
-                                       kvStateProxy.start();
-                               } catch (Throwable ie) {
-                                       kvStateProxy.shutdown();
-                                       kvStateProxy = null;
-                                       LOG.error("Failed to start the 
Queryable State Client Proxy.", ie);
-                               }
-                       }
                }
        }
 
@@ -360,24 +298,6 @@ public class NetworkEnvironment {
 
                        LOG.info("Shutting down the network environment and its 
components.");
 
-                       if (kvStateProxy != null) {
-                               try {
-                                       LOG.debug("Shutting down Queryable 
State Client Proxy.");
-                                       kvStateProxy.shutdown();
-                               } catch (Throwable t) {
-                                       LOG.warn("Cannot shut down Queryable 
State Client Proxy.", t);
-                               }
-                       }
-
-                       if (kvStateServer != null) {
-                               try {
-                                       LOG.debug("Shutting down Queryable 
State Data Server.");
-                                       kvStateServer.shutdown();
-                               } catch (Throwable t) {
-                                       LOG.warn("Cannot shut down Queryable 
State Data Server.", t);
-                               }
-                       }
-
                        // terminate all network connections
                        try {
                                LOG.debug("Shutting down network connection 
manager");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/KvStateService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/KvStateService.java
new file mode 100644
index 0000000..e283dc6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/KvStateService.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.QueryableStateUtils;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * KvState related components of each {@link TaskExecutor} instance. This 
service can
+ * create the kvState registration for a single task.
+ */
+public class KvStateService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateService.class);
+
+       private final Object lock = new Object();
+
+       /** Registry for {@link InternalKvState} instances. */
+       private final KvStateRegistry kvStateRegistry;
+
+       /** Server for {@link InternalKvState} requests. */
+       private KvStateServer kvStateServer;
+
+       /** Proxy for the queryable state client. */
+       private KvStateClientProxy kvStateClientProxy;
+
+       private boolean isShutdown;
+
+       public KvStateService(KvStateRegistry kvStateRegistry, KvStateServer 
kvStateServer, KvStateClientProxy kvStateClientProxy) {
+               this.kvStateRegistry = 
Preconditions.checkNotNull(kvStateRegistry);
+               this.kvStateServer = kvStateServer;
+               this.kvStateClientProxy = kvStateClientProxy;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Getter/Setter
+       // 
--------------------------------------------------------------------------------------------
+
+       public KvStateRegistry getKvStateRegistry() {
+               return kvStateRegistry;
+       }
+
+       public KvStateServer getKvStateServer() {
+               return kvStateServer;
+       }
+
+       public KvStateClientProxy getKvStateClientProxy() {
+               return kvStateClientProxy;
+       }
+
+       public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, 
JobVertexID jobVertexId) {
+               return kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Start and shut down methods
+       // 
--------------------------------------------------------------------------------------------
+
+       public void start() {
+               synchronized (lock) {
+                       Preconditions.checkState(!isShutdown, "The 
KvStateService has already been shut down.");
+
+                       LOG.info("Starting the kvState service and its 
components.");
+
+                       if (kvStateServer != null) {
+                               try {
+                                       kvStateServer.start();
+                               } catch (Throwable ie) {
+                                       kvStateServer.shutdown();
+                                       kvStateServer = null;
+                                       LOG.error("Failed to start the 
Queryable State Data Server.", ie);
+                               }
+                       }
+
+                       if (kvStateClientProxy != null) {
+                               try {
+                                       kvStateClientProxy.start();
+                               } catch (Throwable ie) {
+                                       kvStateClientProxy.shutdown();
+                                       kvStateClientProxy = null;
+                                       LOG.error("Failed to start the 
Queryable State Client Proxy.", ie);
+                               }
+                       }
+               }
+       }
+
+       public void shutdown() {
+               synchronized (lock) {
+                       if (isShutdown) {
+                               return;
+                       }
+
+                       LOG.info("Shutting down the kvState service and its 
components.");
+
+                       if (kvStateClientProxy != null) {
+                               try {
+                                       LOG.debug("Shutting down Queryable 
State Client Proxy.");
+                                       kvStateClientProxy.shutdown();
+                               } catch (Throwable t) {
+                                       LOG.warn("Cannot shut down Queryable 
State Client Proxy.", t);
+                               }
+                       }
+
+                       if (kvStateServer != null) {
+                               try {
+                                       LOG.debug("Shutting down Queryable 
State Data Server.");
+                                       kvStateServer.shutdown();
+                               } catch (Throwable t) {
+                                       LOG.warn("Cannot shut down Queryable 
State Data Server.", t);
+                               }
+                       }
+
+                       isShutdown = true;
+               }
+       }
+
+       public boolean isShutdown() {
+               synchronized (lock) {
+                       return isShutdown;
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //  Static factory methods for kvState service
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Creates and returns the KvState service.
+        *
+        * @param taskManagerServicesConfiguration task manager configuration
+        * @return service for kvState related components
+        */
+       public static KvStateService 
fromConfiguration(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) {
+               KvStateRegistry kvStateRegistry = new KvStateRegistry();
+
+               QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
+
+               KvStateClientProxy kvClientProxy = null;
+               KvStateServer kvStateServer = null;
+
+               if (qsConfig != null) {
+                       int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyServerThreads();
+                       int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyQueryThreads();
+                       kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
+                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                               qsConfig.getProxyPortRange(),
+                               numProxyServerNetworkThreads,
+                               numProxyServerQueryThreads,
+                               new DisabledKvStateRequestStats());
+
+                       int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateServerThreads();
+                       int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
+                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateQueryThreads();
+                       kvStateServer = QueryableStateUtils.createKvStateServer(
+                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
+                               qsConfig.getStateServerPortRange(),
+                               numStateServerNetworkThreads,
+                               numStateServerQueryThreads,
+                               kvStateRegistry,
+                               new DisabledKvStateRequestStats());
+               }
+
+               return new KvStateService(kvStateRegistry, kvStateServer, 
kvClientProxy);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e75fab6..e9f7724 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -173,6 +173,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        /** The network component in the task manager. */
        private final NetworkEnvironment networkEnvironment;
 
+       /** The kvState registration service in the task manager. */
+       private final KvStateService kvStateService;
+
        // --------- job manager connections -----------
 
        private final Map<ResourceID, JobManagerConnection> 
jobManagerConnections;
@@ -238,6 +241,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                this.taskManagerLocation = 
taskExecutorServices.getTaskManagerLocation();
                this.localStateStoresManager = 
taskExecutorServices.getTaskManagerStateStore();
                this.networkEnvironment = 
taskExecutorServices.getNetworkEnvironment();
+               this.kvStateService = taskExecutorServices.getKvStateService();
                this.resourceManagerLeaderRetriever = 
haServices.getResourceManagerLeaderRetriever();
 
                this.jobManagerConnections = new HashMap<>(4);
@@ -526,6 +530,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                taskExecutorServices.getMemoryManager(),
                                taskExecutorServices.getIOManager(),
                                taskExecutorServices.getNetworkEnvironment(),
+                               taskExecutorServices.getKvStateService(),
                                
taskExecutorServices.getBroadcastVariableManager(),
                                taskStateManager,
                                taskManagerActions,
@@ -1261,13 +1266,13 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
        private void disassociateFromJobManager(JobManagerConnection 
jobManagerConnection, Exception cause) throws IOException {
                checkNotNull(jobManagerConnection);
 
-               final KvStateRegistry kvStateRegistry = 
networkEnvironment.getKvStateRegistry();
+               final KvStateRegistry kvStateRegistry = 
kvStateService.getKvStateRegistry();
 
                if (kvStateRegistry != null) {
                        
kvStateRegistry.unregisterListener(jobManagerConnection.getJobID());
                }
 
-               final KvStateClientProxy kvStateClientProxy = 
networkEnvironment.getKvStateProxy();
+               final KvStateClientProxy kvStateClientProxy = 
kvStateService.getKvStateClientProxy();
 
                if (kvStateClientProxy != null) {
                        
kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobID(), 
null);
@@ -1279,8 +1284,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        private void registerQueryableState(JobID jobId, JobMasterGateway 
jobMasterGateway) {
-               final KvStateServer kvStateServer = 
networkEnvironment.getKvStateServer();
-               final KvStateRegistry kvStateRegistry = 
networkEnvironment.getKvStateRegistry();
+               final KvStateServer kvStateServer = 
kvStateService.getKvStateServer();
+               final KvStateRegistry kvStateRegistry = 
kvStateService.getKvStateRegistry();
 
                if (kvStateServer != null && kvStateRegistry != null) {
                        kvStateRegistry.registerListener(
@@ -1290,7 +1295,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                        kvStateServer.getServerAddress()));
                }
 
-               final KvStateClientProxy kvStateProxy = 
networkEnvironment.getKvStateProxy();
+               final KvStateClientProxy kvStateProxy = 
kvStateService.getKvStateClientProxy();
 
                if (kvStateProxy != null) {
                        kvStateProxy.updateKvStateLocationOracle(jobId, 
jobMasterGateway);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 8c566f6..83cb470 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
-import 
org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -40,10 +39,6 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.query.KvStateClientProxy;
-import org.apache.flink.runtime.query.KvStateRegistry;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.QueryableStateUtils;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
@@ -82,6 +77,7 @@ public class TaskManagerServices {
        private final MemoryManager memoryManager;
        private final IOManager ioManager;
        private final NetworkEnvironment networkEnvironment;
+       private final KvStateService kvStateService;
        private final BroadcastVariableManager broadcastVariableManager;
        private final TaskSlotTable taskSlotTable;
        private final JobManagerTable jobManagerTable;
@@ -93,6 +89,7 @@ public class TaskManagerServices {
                MemoryManager memoryManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
+               KvStateService kvStateService,
                BroadcastVariableManager broadcastVariableManager,
                TaskSlotTable taskSlotTable,
                JobManagerTable jobManagerTable,
@@ -103,6 +100,7 @@ public class TaskManagerServices {
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
                this.ioManager = Preconditions.checkNotNull(ioManager);
                this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
+               this.kvStateService = 
Preconditions.checkNotNull(kvStateService);
                this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
                this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
                this.jobManagerTable = 
Preconditions.checkNotNull(jobManagerTable);
@@ -126,6 +124,10 @@ public class TaskManagerServices {
                return networkEnvironment;
        }
 
+       public KvStateService getKvStateService() {
+               return kvStateService;
+       }
+
        public TaskManagerLocation getTaskManagerLocation() {
                return taskManagerLocation;
        }
@@ -186,6 +188,12 @@ public class TaskManagerServices {
                }
 
                try {
+                       kvStateService.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
+               try {
                        taskSlotTable.stop();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
@@ -230,6 +238,9 @@ public class TaskManagerServices {
                final NetworkEnvironment network = 
createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory);
                network.start();
 
+               final KvStateService kvStateService = 
KvStateService.fromConfiguration(taskManagerServicesConfiguration);
+               kvStateService.start();
+
                final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(
                        resourceID,
                        
taskManagerServicesConfiguration.getTaskManagerAddress(),
@@ -277,6 +288,7 @@ public class TaskManagerServices {
                        memoryManager,
                        ioManager,
                        network,
+                       kvStateService,
                        broadcastVariableManager,
                        taskSlotTable,
                        jobManagerTable,
@@ -415,51 +427,12 @@ public class TaskManagerServices {
                ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
 
-               KvStateRegistry kvStateRegistry = new KvStateRegistry();
-
-               QueryableStateConfiguration qsConfig = 
taskManagerServicesConfiguration.getQueryableStateConfig();
-
-               KvStateClientProxy kvClientProxy = null;
-               KvStateServer kvStateServer = null;
-
-               if (qsConfig != null) {
-                       int numProxyServerNetworkThreads = 
qsConfig.numProxyServerThreads() == 0 ?
-                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyServerThreads();
-
-                       int numProxyServerQueryThreads = 
qsConfig.numProxyQueryThreads() == 0 ?
-                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numProxyQueryThreads();
-
-                       kvClientProxy = 
QueryableStateUtils.createKvStateClientProxy(
-                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                               qsConfig.getProxyPortRange(),
-                               numProxyServerNetworkThreads,
-                               numProxyServerQueryThreads,
-                               new DisabledKvStateRequestStats());
-
-                       int numStateServerNetworkThreads = 
qsConfig.numStateServerThreads() == 0 ?
-                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateServerThreads();
-
-                       int numStateServerQueryThreads = 
qsConfig.numStateQueryThreads() == 0 ?
-                               
taskManagerServicesConfiguration.getNumberOfSlots() : 
qsConfig.numStateQueryThreads();
-
-                       kvStateServer = QueryableStateUtils.createKvStateServer(
-                               
taskManagerServicesConfiguration.getTaskManagerAddress(),
-                               qsConfig.getStateServerPortRange(),
-                               numStateServerNetworkThreads,
-                               numStateServerQueryThreads,
-                               kvStateRegistry,
-                               new DisabledKvStateRequestStats());
-               }
-
                // we start the network first, to make sure it can allocate its 
buffers first
                return new NetworkEnvironment(
                        networkBufferPool,
                        connectionManager,
                        resultPartitionManager,
                        taskEventDispatcher,
-                       kvStateRegistry,
-                       kvStateServer,
-                       kvClientProxy,
                        
networkEnvironmentConfiguration.partitionRequestInitialBackoff(),
                        
networkEnvironmentConfiguration.partitionRequestMaxBackoff(),
                        
networkEnvironmentConfiguration.networkBuffersPerChannel(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 698f8bf..e005b74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -69,6 +69,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -220,6 +221,9 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
        /** The gateway to the network stack, which handles inputs and produced 
results. */
        private final NetworkEnvironment network;
 
+       /** The service for kvState registration of this task. */
+       private final KvStateService kvStateService;
+
        /** The registry of this task which enables live reporting of 
accumulators. */
        private final AccumulatorRegistry accumulatorRegistry;
 
@@ -287,6 +291,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                MemoryManager memManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
+               KvStateService kvStateService,
                BroadcastVariableManager bcVarManager,
                TaskStateManager taskStateManager,
                TaskManagerActions taskManagerActions,
@@ -348,6 +353,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                this.libraryCache = Preconditions.checkNotNull(libraryCache);
                this.fileCache = Preconditions.checkNotNull(fileCache);
                this.network = Preconditions.checkNotNull(networkEnvironment);
+               this.kvStateService = 
Preconditions.checkNotNull(kvStateService);
                this.taskManagerConfig = 
Preconditions.checkNotNull(taskManagerConfig);
 
                this.metrics = metricGroup;
@@ -658,7 +664,7 @@ public class Task implements Runnable, TaskActions, 
CheckpointListener {
                        //  call the user code initialization methods
                        // 
----------------------------------------------------------------
 
-                       TaskKvStateRegistry kvStateRegistry = 
network.createKvStateTaskRegistry(jobId, getJobVertexId());
+                       TaskKvStateRegistry kvStateRegistry = 
kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId());
 
                        Environment env = new RuntimeEnvironment(
                                jobId,
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index e96e6f4..02f1753 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -68,7 +68,7 @@ import 
org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegi
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.state.{TaskExecutorLocalStateStoresManager, 
TaskStateManagerImpl}
-import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
+import org.apache.flink.runtime.taskexecutor.{KvStateService, TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
 import org.apache.flink.util.NetUtils
@@ -127,6 +127,7 @@ class TaskManager(
     protected val memoryManager: MemoryManager,
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
+    protected val kvStateService: KvStateService,
     protected val taskManagerLocalStateStoresManager: 
TaskExecutorLocalStateStoresManager,
     protected val numberOfSlots: Int,
     protected val highAvailabilityServices: HighAvailabilityServices,
@@ -941,10 +942,10 @@ class TaskManager(
         taskManagerConnection))
 
 
-    val kvStateServer = network.getKvStateServer()
+    val kvStateServer = kvStateService.getKvStateServer()
 
     if (kvStateServer != null) {
-      val kvStateRegistry = network.getKvStateRegistry()
+      val kvStateRegistry = kvStateService.getKvStateRegistry()
 
       kvStateRegistry.registerListener(
         HighAvailabilityServices.DEFAULT_JOB_ID,
@@ -953,7 +954,7 @@ class TaskManager(
           kvStateServer.getServerAddress))
     }
 
-    val proxy = network.getKvStateProxy
+    val proxy = kvStateService.getKvStateClientProxy
 
     if (proxy != null) {
       proxy.updateKvStateLocationOracle(
@@ -1071,11 +1072,11 @@ class TaskManager(
     // disassociate the slot environment
     connectionUtils = None
 
-    if (network.getKvStateRegistry != null) {
-      
network.getKvStateRegistry.unregisterListener(HighAvailabilityServices.DEFAULT_JOB_ID)
+    if (kvStateService.getKvStateRegistry != null) {
+      
kvStateService.getKvStateRegistry.unregisterListener(HighAvailabilityServices.DEFAULT_JOB_ID)
     }
 
-    val proxy = network.getKvStateProxy
+    val proxy = kvStateService.getKvStateClientProxy
 
     if (proxy != null) {
       // clear the key-value location oracle
@@ -1235,6 +1236,7 @@ class TaskManager(
         memoryManager,
         ioManager,
         network,
+        kvStateService,
         bcVarManager,
         taskStateManager,
         taskManagerConnection,
@@ -2031,6 +2033,7 @@ object TaskManager {
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
+      taskManagerServices.getKvStateService,
       taskManagerServices.getTaskManagerStateStore(),
       highAvailabilityServices,
       taskManagerMetricGroup)
@@ -2049,6 +2052,7 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
+    kvStateService: KvStateService,
     taskStateManager: TaskExecutorLocalStateStoresManager,
     highAvailabilityServices: HighAvailabilityServices,
     taskManagerMetricGroup: TaskManagerMetricGroup
@@ -2061,6 +2065,7 @@ object TaskManager {
       memoryManager,
       ioManager,
       networkEnvironment,
+      kvStateService,
       taskStateManager,
       taskManagerConfig.getNumberSlots(),
       highAvailabilityServices,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 5cdd782..ef4a2da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -71,7 +71,7 @@ import 
org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -267,11 +267,15 @@ public class TaskExecutorTest extends TestLogger {
                        true);
                networkEnvironment.start();
 
+               final KvStateService kvStateService = new KvStateService(new 
KvStateRegistry(), null, null);
+               kvStateService.start();
+
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskManagerLocation(taskManagerLocation)
                        .setMemoryManager(memoryManager)
                        .setIoManager(ioManager)
                        .setNetworkEnvironment(networkEnvironment)
+                       .setKvStateService(kvStateService)
                        .setTaskSlotTable(taskSlotTable)
                        .setJobLeaderService(jobLeaderService)
                        .setTaskStateManager(localStateStoresManager)
@@ -301,6 +305,7 @@ public class TaskExecutorTest extends TestLogger {
                assertThat(memoryManager.isShutdown(), is(true));
                assertThat(networkEnvironment.isShutdown(), is(true));
                assertThat(ioManager.isProperlyShutDown(), is(true));
+               assertThat(kvStateService.isShutdown(), is(true));
        }
 
        @Test
@@ -748,7 +753,6 @@ public class TaskExecutorTest extends TestLogger {
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
 
-               when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), 
eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                final TaskExecutorLocalStateStoresManager 
localStateStoresManager = createTaskExecutorLocalStateStoresManager();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index c9d3b0e..4f361b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
@@ -41,6 +42,7 @@ public class TaskManagerServicesBuilder {
        private MemoryManager memoryManager;
        private IOManager ioManager;
        private NetworkEnvironment networkEnvironment;
+       private KvStateService kvStateService;
        private BroadcastVariableManager broadcastVariableManager;
        private TaskSlotTable taskSlotTable;
        private JobManagerTable jobManagerTable;
@@ -57,6 +59,7 @@ public class TaskManagerServicesBuilder {
                        false);
                ioManager = mock(IOManager.class);
                networkEnvironment = mock(NetworkEnvironment.class);
+               kvStateService = new KvStateService(new KvStateRegistry(), 
null, null);
                broadcastVariableManager = new BroadcastVariableManager();
                taskSlotTable = mock(TaskSlotTable.class);
                jobManagerTable = new JobManagerTable();
@@ -84,6 +87,11 @@ public class TaskManagerServicesBuilder {
                return this;
        }
 
+       public TaskManagerServicesBuilder setKvStateService(KvStateService 
kvStateService) {
+               this.kvStateService = kvStateService;
+               return this;
+       }
+
        public TaskManagerServicesBuilder 
setBroadcastVariableManager(BroadcastVariableManager broadcastVariableManager) {
                this.broadcastVariableManager = broadcastVariableManager;
                return this;
@@ -115,6 +123,7 @@ public class TaskManagerServicesBuilder {
                        memoryManager,
                        ioManager,
                        networkEnvironment,
+                       kvStateService,
                        broadcastVariableManager,
                        taskSlotTable,
                        jobManagerTable,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 353556e..ce3e1b4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -53,8 +53,9 @@ import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.util.SerializedValue;
@@ -224,8 +225,6 @@ public class TaskAsyncCallTest extends TestLogger {
                TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-               
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
-                               .thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
@@ -260,6 +259,7 @@ public class TaskAsyncCallTest extends TestLogger {
                        mock(MemoryManager.class),
                        mock(IOManager.class),
                        networkEnvironment,
+                       new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        new TestTaskStateManager(),
                        mock(TaskManagerActions.class),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 3447851..d526992 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -60,8 +60,9 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -575,8 +576,6 @@ public class TaskTest extends TestLogger {
                final ResultPartitionConsumableNotifier consumableNotifier = 
new NoOpResultPartitionConsumableNotifier();
                final NetworkEnvironment network = 
mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class));
-               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
-                       .thenReturn(mock(TaskKvStateRegistry.class));
                
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                // Test all branches of trigger partition state check
@@ -929,6 +928,7 @@ public class TaskTest extends TestLogger {
                private ResultPartitionConsumableNotifier consumableNotifier;
                private PartitionProducerStateChecker 
partitionProducerStateChecker;
                private NetworkEnvironment networkEnvironment;
+               private KvStateService kvStateService;
                private Executor executor;
                private Configuration taskManagerConfig;
                private ExecutionConfig executionConfig;
@@ -948,10 +948,10 @@ public class TaskTest extends TestLogger {
                        final TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                        networkEnvironment = mock(NetworkEnvironment.class);
                        
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
-                       
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
-                               .thenReturn(mock(TaskKvStateRegistry.class));
                        
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
+                       kvStateService = new KvStateService(new 
KvStateRegistry(), null, null);
+
                        executor = TestingUtils.defaultExecutor();
 
                        taskManagerConfig = new Configuration();
@@ -990,6 +990,11 @@ public class TaskTest extends TestLogger {
                        return this;
                }
 
+               TaskBuilder setKvStateService(KvStateService kvStateService) {
+                       this.kvStateService = kvStateService;
+                       return this;
+               }
+
                TaskBuilder setExecutor(Executor executor) {
                        this.executor = executor;
                        return this;
@@ -1053,6 +1058,7 @@ public class TaskTest extends TestLogger {
                                mock(MemoryManager.class),
                                mock(IOManager.class),
                                networkEnvironment,
+                               kvStateService,
                                mock(BroadcastVariableManager.class),
                                new TestTaskStateManager(),
                                taskManagerActions,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 082005c..99f93e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -54,12 +54,13 @@ import 
org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
 import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -167,7 +168,6 @@ public class JvmExitOnFatalErrorTest {
                                final IOManager ioManager = new 
IOManagerAsync();
 
                                final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
-                               
when(networkEnvironment.createKvStateTaskRegistry(jid, 
jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class));
                                TaskEventDispatcher taskEventDispatcher = 
mock(TaskEventDispatcher.class);
                                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
@@ -208,6 +208,7 @@ public class JvmExitOnFatalErrorTest {
                                                memoryManager,
                                                ioManager,
                                                networkEnvironment,
+                                               new KvStateService(new 
KvStateRegistry(), null, null),
                                                new BroadcastVariableManager(),
                                                slotStateManager,
                                                new NoOpTaskManagerActions(),
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
index 5e60be9..b1e551b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager
-import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
+import org.apache.flink.runtime.taskexecutor.{KvStateService, 
TaskManagerConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
 import scala.language.postfixOps
@@ -39,6 +39,7 @@ class TestingTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    kvStateService: KvStateService,
     taskManagerStateStore: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
@@ -50,6 +51,7 @@ class TestingTaskManager(
     memoryManager,
     ioManager,
     network,
+    kvStateService,
     taskManagerStateStore,
     numberOfSlots,
     highAvailabilityServices,
@@ -62,6 +64,7 @@ class TestingTaskManager(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     network: NetworkEnvironment,
+    kvStateService: KvStateService,
     taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
@@ -73,6 +76,7 @@ class TestingTaskManager(
       memoryManager,
       ioManager,
       network,
+      kvStateService,
       taskManagerLocalStateStoresManager,
       numberOfSlots,
       highAvailabilityServices,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index e7b54ec..8dc0ac4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -218,9 +217,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends 
IOReadableWritable> {
                        nettyConnectionManager,
                        new ResultPartitionManager(),
                        new TaskEventDispatcher(),
-                       new KvStateRegistry(),
-                       null,
-                       null,
                        
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(),
                        
TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(),
                        
TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 0c31184..d80db71 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -65,6 +65,7 @@ import 
org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -92,7 +93,6 @@ import java.util.concurrent.Executor;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -183,8 +183,6 @@ public class InterruptSensitiveRestoreTest {
 
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
-               
when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
-                               .thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                Collection<KeyedStateHandle> keyedStateFromBackend = 
Collections.emptyList();
@@ -275,6 +273,7 @@ public class InterruptSensitiveRestoreTest {
                        mock(MemoryManager.class),
                        mock(IOManager.class),
                        networkEnvironment,
+                       new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        taskStateManager,
                        mock(TaskManagerActions.class),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 54f4855..8a69fe9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -66,6 +67,7 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -149,7 +151,6 @@ public class StreamTaskTerminationTest extends TestLogger {
 
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                final NetworkEnvironment networkEnv = 
mock(NetworkEnvironment.class);
-               when(networkEnv.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class));
                
when(networkEnv.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                BlobCacheService blobService =
@@ -168,6 +169,7 @@ public class StreamTaskTerminationTest extends TestLogger {
                        new MemoryManager(32L * 1024L, 1),
                        new IOManagerAsync(),
                        networkEnv,
+                       new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        new TestTaskStateManager(),
                        mock(TaskManagerActions.class),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 372a9d8..30c460a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -64,7 +64,7 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStorage;
@@ -85,6 +85,7 @@ import org.apache.flink.runtime.state.TaskStateManagerImpl;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
@@ -901,8 +902,6 @@ public class StreamTaskTest extends TestLogger {
 
                NetworkEnvironment network = mock(NetworkEnvironment.class);
                
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
-                               .thenReturn(mock(TaskKvStateRegistry.class));
                
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                JobInformation jobInformation = new JobInformation(
@@ -934,6 +933,7 @@ public class StreamTaskTest extends TestLogger {
                        mock(MemoryManager.class),
                        mock(IOManager.class),
                        network,
+                       new KvStateService(new KvStateRegistry(), null, null),
                        mock(BroadcastVariableManager.class),
                        taskStateManager,
                        taskManagerActions,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 5c43564..ebfaac5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -54,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -71,6 +71,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.testutils.BackendForTestStream;
+import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -99,7 +100,6 @@ import java.util.concurrent.RunnableFuture;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -224,10 +224,8 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                TestStreamTask.class.getName(),
                                taskConfig);
 
-               TaskKvStateRegistry mockKvRegistry = 
mock(TaskKvStateRegistry.class);
                TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
                NetworkEnvironment network = mock(NetworkEnvironment.class);
-               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class))).thenReturn(mockKvRegistry);
                
when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
 
                BlobCacheService blobService =
@@ -246,6 +244,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
                                mock(MemoryManager.class),
                                mock(IOManager.class),
                                network,
+                               new KvStateService(new KvStateRegistry(), null, 
null),
                                mock(BroadcastVariableManager.class),
                                new TestTaskStateManager(),
                                mock(TaskManagerActions.class),

Reply via email to