This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c4aa3ad [FLINK-11723][network] Remove KvState related components from NetworkEnvironment c4aa3ad is described below commit c4aa3ad5f706aedc12f884905a20d96de3977d31 Author: zhijiang <wangzhijiang...@aliyun.com> AuthorDate: Fri Mar 29 17:04:24 2019 +0800 [FLINK-11723][network] Remove KvState related components from NetworkEnvironment --- .../runtime/io/network/NetworkEnvironment.java | 98 +---------- .../flink/runtime/taskexecutor/KvStateService.java | 194 +++++++++++++++++++++ .../flink/runtime/taskexecutor/TaskExecutor.java | 15 +- .../runtime/taskexecutor/TaskManagerServices.java | 61 ++----- .../org/apache/flink/runtime/taskmanager/Task.java | 8 +- .../flink/runtime/taskmanager/TaskManager.scala | 19 +- .../runtime/taskexecutor/TaskExecutorTest.java | 8 +- .../taskexecutor/TaskManagerServicesBuilder.java | 9 + .../runtime/taskmanager/TaskAsyncCallTest.java | 6 +- .../apache/flink/runtime/taskmanager/TaskTest.java | 16 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 5 +- .../runtime/testingUtils/TestingTaskManager.scala | 6 +- .../StreamNetworkBenchmarkEnvironment.java | 4 - .../tasks/InterruptSensitiveRestoreTest.java | 7 +- .../runtime/tasks/StreamTaskTerminationTest.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 7 +- 17 files changed, 298 insertions(+), 175 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index d3cb897..1c0b6da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -19,19 +19,12 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.query.KvStateClientProxy; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServer; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.util.ExceptionUtils; @@ -63,15 +56,6 @@ public class NetworkEnvironment { private final TaskEventDispatcher taskEventDispatcher; - /** Server for {@link InternalKvState} requests. */ - private KvStateServer kvStateServer; - - /** Proxy for the queryable state client. */ - private KvStateClientProxy kvStateProxy; - - /** Registry for {@link InternalKvState} instances. */ - private final KvStateRegistry kvStateRegistry; - private final int partitionRequestInitialBackoff; private final int partitionRequestMaxBackoff; @@ -99,9 +83,6 @@ public class NetworkEnvironment { new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, partitionRequestInitialBackoff, partitionRequestMaxBackoff, networkBuffersPerChannel, @@ -110,27 +91,20 @@ public class NetworkEnvironment { } public NetworkEnvironment( - NetworkBufferPool networkBufferPool, - ConnectionManager connectionManager, - ResultPartitionManager resultPartitionManager, - TaskEventDispatcher taskEventDispatcher, - KvStateRegistry kvStateRegistry, - KvStateServer kvStateServer, - KvStateClientProxy kvStateClientProxy, - int partitionRequestInitialBackoff, - int partitionRequestMaxBackoff, - int networkBuffersPerChannel, - int extraNetworkBuffersPerGate, - boolean enableCreditBased) { + NetworkBufferPool networkBufferPool, + ConnectionManager connectionManager, + ResultPartitionManager resultPartitionManager, + TaskEventDispatcher taskEventDispatcher, + int partitionRequestInitialBackoff, + int partitionRequestMaxBackoff, + int networkBuffersPerChannel, + int extraNetworkBuffersPerGate, + boolean enableCreditBased) { this.networkBufferPool = checkNotNull(networkBufferPool); this.connectionManager = checkNotNull(connectionManager); this.resultPartitionManager = checkNotNull(resultPartitionManager); this.taskEventDispatcher = checkNotNull(taskEventDispatcher); - this.kvStateRegistry = checkNotNull(kvStateRegistry); - - this.kvStateServer = kvStateServer; - this.kvStateProxy = kvStateClientProxy; this.partitionRequestInitialBackoff = partitionRequestInitialBackoff; this.partitionRequestMaxBackoff = partitionRequestMaxBackoff; @@ -174,22 +148,6 @@ public class NetworkEnvironment { return enableCreditBased; } - public KvStateRegistry getKvStateRegistry() { - return kvStateRegistry; - } - - public KvStateServer getKvStateServer() { - return kvStateServer; - } - - public KvStateClientProxy getKvStateProxy() { - return kvStateProxy; - } - - public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) { - return kvStateRegistry.createTaskRegistry(jobId, jobVertexId); - } - // -------------------------------------------------------------------------------------------- // Task operations // -------------------------------------------------------------------------------------------- @@ -326,26 +284,6 @@ public class NetworkEnvironment { } catch (IOException t) { throw new IOException("Failed to instantiate network connection manager.", t); } - - if (kvStateServer != null) { - try { - kvStateServer.start(); - } catch (Throwable ie) { - kvStateServer.shutdown(); - kvStateServer = null; - LOG.error("Failed to start the Queryable State Data Server.", ie); - } - } - - if (kvStateProxy != null) { - try { - kvStateProxy.start(); - } catch (Throwable ie) { - kvStateProxy.shutdown(); - kvStateProxy = null; - LOG.error("Failed to start the Queryable State Client Proxy.", ie); - } - } } } @@ -360,24 +298,6 @@ public class NetworkEnvironment { LOG.info("Shutting down the network environment and its components."); - if (kvStateProxy != null) { - try { - LOG.debug("Shutting down Queryable State Client Proxy."); - kvStateProxy.shutdown(); - } catch (Throwable t) { - LOG.warn("Cannot shut down Queryable State Client Proxy.", t); - } - } - - if (kvStateServer != null) { - try { - LOG.debug("Shutting down Queryable State Data Server."); - kvStateServer.shutdown(); - } catch (Throwable t) { - LOG.warn("Cannot shut down Queryable State Data Server.", t); - } - } - // terminate all network connections try { LOG.debug("Shutting down network connection manager"); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/KvStateService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/KvStateService.java new file mode 100644 index 0000000..e283dc6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/KvStateService.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.query.KvStateClientProxy; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.KvStateServer; +import org.apache.flink.runtime.query.QueryableStateUtils; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * KvState related components of each {@link TaskExecutor} instance. This service can + * create the kvState registration for a single task. + */ +public class KvStateService { + private static final Logger LOG = LoggerFactory.getLogger(KvStateService.class); + + private final Object lock = new Object(); + + /** Registry for {@link InternalKvState} instances. */ + private final KvStateRegistry kvStateRegistry; + + /** Server for {@link InternalKvState} requests. */ + private KvStateServer kvStateServer; + + /** Proxy for the queryable state client. */ + private KvStateClientProxy kvStateClientProxy; + + private boolean isShutdown; + + public KvStateService(KvStateRegistry kvStateRegistry, KvStateServer kvStateServer, KvStateClientProxy kvStateClientProxy) { + this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry); + this.kvStateServer = kvStateServer; + this.kvStateClientProxy = kvStateClientProxy; + } + + // -------------------------------------------------------------------------------------------- + // Getter/Setter + // -------------------------------------------------------------------------------------------- + + public KvStateRegistry getKvStateRegistry() { + return kvStateRegistry; + } + + public KvStateServer getKvStateServer() { + return kvStateServer; + } + + public KvStateClientProxy getKvStateClientProxy() { + return kvStateClientProxy; + } + + public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) { + return kvStateRegistry.createTaskRegistry(jobId, jobVertexId); + } + + // -------------------------------------------------------------------------------------------- + // Start and shut down methods + // -------------------------------------------------------------------------------------------- + + public void start() { + synchronized (lock) { + Preconditions.checkState(!isShutdown, "The KvStateService has already been shut down."); + + LOG.info("Starting the kvState service and its components."); + + if (kvStateServer != null) { + try { + kvStateServer.start(); + } catch (Throwable ie) { + kvStateServer.shutdown(); + kvStateServer = null; + LOG.error("Failed to start the Queryable State Data Server.", ie); + } + } + + if (kvStateClientProxy != null) { + try { + kvStateClientProxy.start(); + } catch (Throwable ie) { + kvStateClientProxy.shutdown(); + kvStateClientProxy = null; + LOG.error("Failed to start the Queryable State Client Proxy.", ie); + } + } + } + } + + public void shutdown() { + synchronized (lock) { + if (isShutdown) { + return; + } + + LOG.info("Shutting down the kvState service and its components."); + + if (kvStateClientProxy != null) { + try { + LOG.debug("Shutting down Queryable State Client Proxy."); + kvStateClientProxy.shutdown(); + } catch (Throwable t) { + LOG.warn("Cannot shut down Queryable State Client Proxy.", t); + } + } + + if (kvStateServer != null) { + try { + LOG.debug("Shutting down Queryable State Data Server."); + kvStateServer.shutdown(); + } catch (Throwable t) { + LOG.warn("Cannot shut down Queryable State Data Server.", t); + } + } + + isShutdown = true; + } + } + + public boolean isShutdown() { + synchronized (lock) { + return isShutdown; + } + } + + // -------------------------------------------------------------------------------------------- + // Static factory methods for kvState service + // -------------------------------------------------------------------------------------------- + + /** + * Creates and returns the KvState service. + * + * @param taskManagerServicesConfiguration task manager configuration + * @return service for kvState related components + */ + public static KvStateService fromConfiguration(TaskManagerServicesConfiguration taskManagerServicesConfiguration) { + KvStateRegistry kvStateRegistry = new KvStateRegistry(); + + QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); + + KvStateClientProxy kvClientProxy = null; + KvStateServer kvStateServer = null; + + if (qsConfig != null) { + int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads(); + int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads(); + kvClientProxy = QueryableStateUtils.createKvStateClientProxy( + taskManagerServicesConfiguration.getTaskManagerAddress(), + qsConfig.getProxyPortRange(), + numProxyServerNetworkThreads, + numProxyServerQueryThreads, + new DisabledKvStateRequestStats()); + + int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads(); + int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? + taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads(); + kvStateServer = QueryableStateUtils.createKvStateServer( + taskManagerServicesConfiguration.getTaskManagerAddress(), + qsConfig.getStateServerPortRange(), + numStateServerNetworkThreads, + numStateServerQueryThreads, + kvStateRegistry, + new DisabledKvStateRequestStats()); + } + + return new KvStateService(kvStateRegistry, kvStateServer, kvClientProxy); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index e75fab6..e9f7724 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -173,6 +173,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { /** The network component in the task manager. */ private final NetworkEnvironment networkEnvironment; + /** The kvState registration service in the task manager. */ + private final KvStateService kvStateService; + // --------- job manager connections ----------- private final Map<ResourceID, JobManagerConnection> jobManagerConnections; @@ -238,6 +241,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { this.taskManagerLocation = taskExecutorServices.getTaskManagerLocation(); this.localStateStoresManager = taskExecutorServices.getTaskManagerStateStore(); this.networkEnvironment = taskExecutorServices.getNetworkEnvironment(); + this.kvStateService = taskExecutorServices.getKvStateService(); this.resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever(); this.jobManagerConnections = new HashMap<>(4); @@ -526,6 +530,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { taskExecutorServices.getMemoryManager(), taskExecutorServices.getIOManager(), taskExecutorServices.getNetworkEnvironment(), + taskExecutorServices.getKvStateService(), taskExecutorServices.getBroadcastVariableManager(), taskStateManager, taskManagerActions, @@ -1261,13 +1266,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception cause) throws IOException { checkNotNull(jobManagerConnection); - final KvStateRegistry kvStateRegistry = networkEnvironment.getKvStateRegistry(); + final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry(); if (kvStateRegistry != null) { kvStateRegistry.unregisterListener(jobManagerConnection.getJobID()); } - final KvStateClientProxy kvStateClientProxy = networkEnvironment.getKvStateProxy(); + final KvStateClientProxy kvStateClientProxy = kvStateService.getKvStateClientProxy(); if (kvStateClientProxy != null) { kvStateClientProxy.updateKvStateLocationOracle(jobManagerConnection.getJobID(), null); @@ -1279,8 +1284,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } private void registerQueryableState(JobID jobId, JobMasterGateway jobMasterGateway) { - final KvStateServer kvStateServer = networkEnvironment.getKvStateServer(); - final KvStateRegistry kvStateRegistry = networkEnvironment.getKvStateRegistry(); + final KvStateServer kvStateServer = kvStateService.getKvStateServer(); + final KvStateRegistry kvStateRegistry = kvStateService.getKvStateRegistry(); if (kvStateServer != null && kvStateRegistry != null) { kvStateRegistry.registerListener( @@ -1290,7 +1295,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { kvStateServer.getServerAddress())); } - final KvStateClientProxy kvStateProxy = networkEnvironment.getKvStateProxy(); + final KvStateClientProxy kvStateProxy = kvStateService.getKvStateClientProxy(); if (kvStateProxy != null) { kvStateProxy.updateKvStateLocationOracle(jobId, jobMasterGateway); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 8c566f6..83cb470 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -40,10 +39,6 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.query.KvStateClientProxy; -import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateServer; -import org.apache.flink.runtime.query.QueryableStateUtils; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; @@ -82,6 +77,7 @@ public class TaskManagerServices { private final MemoryManager memoryManager; private final IOManager ioManager; private final NetworkEnvironment networkEnvironment; + private final KvStateService kvStateService; private final BroadcastVariableManager broadcastVariableManager; private final TaskSlotTable taskSlotTable; private final JobManagerTable jobManagerTable; @@ -93,6 +89,7 @@ public class TaskManagerServices { MemoryManager memoryManager, IOManager ioManager, NetworkEnvironment networkEnvironment, + KvStateService kvStateService, BroadcastVariableManager broadcastVariableManager, TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, @@ -103,6 +100,7 @@ public class TaskManagerServices { this.memoryManager = Preconditions.checkNotNull(memoryManager); this.ioManager = Preconditions.checkNotNull(ioManager); this.networkEnvironment = Preconditions.checkNotNull(networkEnvironment); + this.kvStateService = Preconditions.checkNotNull(kvStateService); this.broadcastVariableManager = Preconditions.checkNotNull(broadcastVariableManager); this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable); this.jobManagerTable = Preconditions.checkNotNull(jobManagerTable); @@ -126,6 +124,10 @@ public class TaskManagerServices { return networkEnvironment; } + public KvStateService getKvStateService() { + return kvStateService; + } + public TaskManagerLocation getTaskManagerLocation() { return taskManagerLocation; } @@ -186,6 +188,12 @@ public class TaskManagerServices { } try { + kvStateService.shutdown(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + try { taskSlotTable.stop(); } catch (Exception e) { exception = ExceptionUtils.firstOrSuppressed(e, exception); @@ -230,6 +238,9 @@ public class TaskManagerServices { final NetworkEnvironment network = createNetworkEnvironment(taskManagerServicesConfiguration, maxJvmHeapMemory); network.start(); + final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration); + kvStateService.start(); + final TaskManagerLocation taskManagerLocation = new TaskManagerLocation( resourceID, taskManagerServicesConfiguration.getTaskManagerAddress(), @@ -277,6 +288,7 @@ public class TaskManagerServices { memoryManager, ioManager, network, + kvStateService, broadcastVariableManager, taskSlotTable, jobManagerTable, @@ -415,51 +427,12 @@ public class TaskManagerServices { ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); - KvStateRegistry kvStateRegistry = new KvStateRegistry(); - - QueryableStateConfiguration qsConfig = taskManagerServicesConfiguration.getQueryableStateConfig(); - - KvStateClientProxy kvClientProxy = null; - KvStateServer kvStateServer = null; - - if (qsConfig != null) { - int numProxyServerNetworkThreads = qsConfig.numProxyServerThreads() == 0 ? - taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyServerThreads(); - - int numProxyServerQueryThreads = qsConfig.numProxyQueryThreads() == 0 ? - taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numProxyQueryThreads(); - - kvClientProxy = QueryableStateUtils.createKvStateClientProxy( - taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.getProxyPortRange(), - numProxyServerNetworkThreads, - numProxyServerQueryThreads, - new DisabledKvStateRequestStats()); - - int numStateServerNetworkThreads = qsConfig.numStateServerThreads() == 0 ? - taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateServerThreads(); - - int numStateServerQueryThreads = qsConfig.numStateQueryThreads() == 0 ? - taskManagerServicesConfiguration.getNumberOfSlots() : qsConfig.numStateQueryThreads(); - - kvStateServer = QueryableStateUtils.createKvStateServer( - taskManagerServicesConfiguration.getTaskManagerAddress(), - qsConfig.getStateServerPortRange(), - numStateServerNetworkThreads, - numStateServerQueryThreads, - kvStateRegistry, - new DisabledKvStateRequestStats()); - } - // we start the network first, to make sure it can allocate its buffers first return new NetworkEnvironment( networkBufferPool, connectionManager, resultPartitionManager, taskEventDispatcher, - kvStateRegistry, - kvStateServer, - kvClientProxy, networkEnvironmentConfiguration.partitionRequestInitialBackoff(), networkEnvironmentConfiguration.partitionRequestMaxBackoff(), networkEnvironmentConfiguration.networkBuffersPerChannel(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 698f8bf..e005b74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -69,6 +69,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.util.FatalExitExceptionHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -220,6 +221,9 @@ public class Task implements Runnable, TaskActions, CheckpointListener { /** The gateway to the network stack, which handles inputs and produced results. */ private final NetworkEnvironment network; + /** The service for kvState registration of this task. */ + private final KvStateService kvStateService; + /** The registry of this task which enables live reporting of accumulators. */ private final AccumulatorRegistry accumulatorRegistry; @@ -287,6 +291,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener { MemoryManager memManager, IOManager ioManager, NetworkEnvironment networkEnvironment, + KvStateService kvStateService, BroadcastVariableManager bcVarManager, TaskStateManager taskStateManager, TaskManagerActions taskManagerActions, @@ -348,6 +353,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener { this.libraryCache = Preconditions.checkNotNull(libraryCache); this.fileCache = Preconditions.checkNotNull(fileCache); this.network = Preconditions.checkNotNull(networkEnvironment); + this.kvStateService = Preconditions.checkNotNull(kvStateService); this.taskManagerConfig = Preconditions.checkNotNull(taskManagerConfig); this.metrics = metricGroup; @@ -658,7 +664,7 @@ public class Task implements Runnable, TaskActions, CheckpointListener { // call the user code initialization methods // ---------------------------------------------------------------- - TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); + TaskKvStateRegistry kvStateRegistry = kvStateService.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index e96e6f4..02f1753 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -68,7 +68,7 @@ import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegi import org.apache.flink.runtime.process.ProcessReaper import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils} import org.apache.flink.runtime.state.{TaskExecutorLocalStateStoresManager, TaskStateManagerImpl} -import org.apache.flink.runtime.taskexecutor.{TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} +import org.apache.flink.runtime.taskexecutor.{KvStateService, TaskExecutor, TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration} import org.apache.flink.runtime.util._ import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages} import org.apache.flink.util.NetUtils @@ -127,6 +127,7 @@ class TaskManager( protected val memoryManager: MemoryManager, protected val ioManager: IOManager, protected val network: NetworkEnvironment, + protected val kvStateService: KvStateService, protected val taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, protected val numberOfSlots: Int, protected val highAvailabilityServices: HighAvailabilityServices, @@ -941,10 +942,10 @@ class TaskManager( taskManagerConnection)) - val kvStateServer = network.getKvStateServer() + val kvStateServer = kvStateService.getKvStateServer() if (kvStateServer != null) { - val kvStateRegistry = network.getKvStateRegistry() + val kvStateRegistry = kvStateService.getKvStateRegistry() kvStateRegistry.registerListener( HighAvailabilityServices.DEFAULT_JOB_ID, @@ -953,7 +954,7 @@ class TaskManager( kvStateServer.getServerAddress)) } - val proxy = network.getKvStateProxy + val proxy = kvStateService.getKvStateClientProxy if (proxy != null) { proxy.updateKvStateLocationOracle( @@ -1071,11 +1072,11 @@ class TaskManager( // disassociate the slot environment connectionUtils = None - if (network.getKvStateRegistry != null) { - network.getKvStateRegistry.unregisterListener(HighAvailabilityServices.DEFAULT_JOB_ID) + if (kvStateService.getKvStateRegistry != null) { + kvStateService.getKvStateRegistry.unregisterListener(HighAvailabilityServices.DEFAULT_JOB_ID) } - val proxy = network.getKvStateProxy + val proxy = kvStateService.getKvStateClientProxy if (proxy != null) { // clear the key-value location oracle @@ -1235,6 +1236,7 @@ class TaskManager( memoryManager, ioManager, network, + kvStateService, bcVarManager, taskStateManager, taskManagerConnection, @@ -2031,6 +2033,7 @@ object TaskManager { taskManagerServices.getMemoryManager(), taskManagerServices.getIOManager(), taskManagerServices.getNetworkEnvironment(), + taskManagerServices.getKvStateService, taskManagerServices.getTaskManagerStateStore(), highAvailabilityServices, taskManagerMetricGroup) @@ -2049,6 +2052,7 @@ object TaskManager { memoryManager: MemoryManager, ioManager: IOManager, networkEnvironment: NetworkEnvironment, + kvStateService: KvStateService, taskStateManager: TaskExecutorLocalStateStoresManager, highAvailabilityServices: HighAvailabilityServices, taskManagerMetricGroup: TaskManagerMetricGroup @@ -2061,6 +2065,7 @@ object TaskManager { memoryManager, ioManager, networkEnvironment, + kvStateService, taskStateManager, taskManagerConfig.getNumberSlots(), highAvailabilityServices, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 5cdd782..ef4a2da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -71,7 +71,7 @@ import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -267,11 +267,15 @@ public class TaskExecutorTest extends TestLogger { true); networkEnvironment.start(); + final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null); + kvStateService.start(); + final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) .setMemoryManager(memoryManager) .setIoManager(ioManager) .setNetworkEnvironment(networkEnvironment) + .setKvStateService(kvStateService) .setTaskSlotTable(taskSlotTable) .setJobLeaderService(jobLeaderService) .setTaskStateManager(localStateStoresManager) @@ -301,6 +305,7 @@ public class TaskExecutorTest extends TestLogger { assertThat(memoryManager.isShutdown(), is(true)); assertThat(networkEnvironment.isShutdown(), is(true)); assertThat(ioManager.isProperlyShutDown(), is(true)); + assertThat(kvStateService.isShutdown(), is(true)); } @Test @@ -748,7 +753,6 @@ public class TaskExecutorTest extends TestLogger { TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); - when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java index c9d3b0e..4f361b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -41,6 +42,7 @@ public class TaskManagerServicesBuilder { private MemoryManager memoryManager; private IOManager ioManager; private NetworkEnvironment networkEnvironment; + private KvStateService kvStateService; private BroadcastVariableManager broadcastVariableManager; private TaskSlotTable taskSlotTable; private JobManagerTable jobManagerTable; @@ -57,6 +59,7 @@ public class TaskManagerServicesBuilder { false); ioManager = mock(IOManager.class); networkEnvironment = mock(NetworkEnvironment.class); + kvStateService = new KvStateService(new KvStateRegistry(), null, null); broadcastVariableManager = new BroadcastVariableManager(); taskSlotTable = mock(TaskSlotTable.class); jobManagerTable = new JobManagerTable(); @@ -84,6 +87,11 @@ public class TaskManagerServicesBuilder { return this; } + public TaskManagerServicesBuilder setKvStateService(KvStateService kvStateService) { + this.kvStateService = kvStateService; + return this; + } + public TaskManagerServicesBuilder setBroadcastVariableManager(BroadcastVariableManager broadcastVariableManager) { this.broadcastVariableManager = broadcastVariableManager; return this; @@ -115,6 +123,7 @@ public class TaskManagerServicesBuilder { memoryManager, ioManager, networkEnvironment, + kvStateService, broadcastVariableManager, taskSlotTable, jobManagerTable, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 353556e..ce3e1b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -53,8 +53,9 @@ import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.util.SerializedValue; @@ -224,8 +225,6 @@ public class TaskAsyncCallTest extends TestLogger { TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class); NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager); - when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) - .thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class); @@ -260,6 +259,7 @@ public class TaskAsyncCallTest extends TestLogger { mock(MemoryManager.class), mock(IOManager.class), networkEnvironment, + new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TestTaskStateManager(), mock(TaskManagerActions.class), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 3447851..d526992 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -60,8 +60,9 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; @@ -575,8 +576,6 @@ public class TaskTest extends TestLogger { final ResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier(); final NetworkEnvironment network = mock(NetworkEnvironment.class); when(network.getResultPartitionManager()).thenReturn(mock(ResultPartitionManager.class)); - when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) - .thenReturn(mock(TaskKvStateRegistry.class)); when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); // Test all branches of trigger partition state check @@ -929,6 +928,7 @@ public class TaskTest extends TestLogger { private ResultPartitionConsumableNotifier consumableNotifier; private PartitionProducerStateChecker partitionProducerStateChecker; private NetworkEnvironment networkEnvironment; + private KvStateService kvStateService; private Executor executor; private Configuration taskManagerConfig; private ExecutionConfig executionConfig; @@ -948,10 +948,10 @@ public class TaskTest extends TestLogger { final TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class); networkEnvironment = mock(NetworkEnvironment.class); when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager); - when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) - .thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); + kvStateService = new KvStateService(new KvStateRegistry(), null, null); + executor = TestingUtils.defaultExecutor(); taskManagerConfig = new Configuration(); @@ -990,6 +990,11 @@ public class TaskTest extends TestLogger { return this; } + TaskBuilder setKvStateService(KvStateService kvStateService) { + this.kvStateService = kvStateService; + return this; + } + TaskBuilder setExecutor(Executor executor) { this.executor = executor; return this; @@ -1053,6 +1058,7 @@ public class TaskTest extends TestLogger { mock(MemoryManager.class), mock(IOManager.class), networkEnvironment, + kvStateService, mock(BroadcastVariableManager.class), new TestTaskStateManager(), taskManagerActions, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java index 082005c..99f93e9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java @@ -54,12 +54,13 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.TaskLocalStateStore; import org.apache.flink.runtime.state.TaskLocalStateStoreImpl; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TaskStateManagerImpl; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.taskmanager.CheckpointResponder; @@ -167,7 +168,6 @@ public class JvmExitOnFatalErrorTest { final IOManager ioManager = new IOManagerAsync(); final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); - when(networkEnvironment.createKvStateTaskRegistry(jid, jobVertexId)).thenReturn(mock(TaskKvStateRegistry.class)); TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); @@ -208,6 +208,7 @@ public class JvmExitOnFatalErrorTest { memoryManager, ioManager, networkEnvironment, + new KvStateService(new KvStateRegistry(), null, null), new BroadcastVariableManager(), slotStateManager, new NoOpTaskManagerActions(), diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala index 5e60be9..b1e551b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingTaskManager.scala @@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.memory.MemoryManager import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration +import org.apache.flink.runtime.taskexecutor.{KvStateService, TaskManagerConfiguration} import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation} import scala.language.postfixOps @@ -39,6 +39,7 @@ class TestingTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + kvStateService: KvStateService, taskManagerStateStore: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, @@ -50,6 +51,7 @@ class TestingTaskManager( memoryManager, ioManager, network, + kvStateService, taskManagerStateStore, numberOfSlots, highAvailabilityServices, @@ -62,6 +64,7 @@ class TestingTaskManager( memoryManager: MemoryManager, ioManager: IOManager, network: NetworkEnvironment, + kvStateService: KvStateService, taskManagerLocalStateStoresManager: TaskExecutorLocalStateStoresManager, numberOfSlots: Int, highAvailabilityServices: HighAvailabilityServices, @@ -73,6 +76,7 @@ class TestingTaskManager( memoryManager, ioManager, network, + kvStateService, taskManagerLocalStateStoresManager, numberOfSlots, highAvailabilityServices, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java index e7b54ec..8dc0ac4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java @@ -49,7 +49,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -218,9 +217,6 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> { nettyConnectionManager, new ResultPartitionManager(), new TaskEventDispatcher(), - new KvStateRegistry(), - null, - null, TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL.defaultValue(), TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX.defaultValue(), TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL.defaultValue(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 0c31184..d80db71 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -65,6 +65,7 @@ import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -92,7 +93,6 @@ import java.util.concurrent.Executor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -183,8 +183,6 @@ public class InterruptSensitiveRestoreTest { TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); - when(networkEnvironment.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) - .thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); Collection<KeyedStateHandle> keyedStateFromBackend = Collections.emptyList(); @@ -275,6 +273,7 @@ public class InterruptSensitiveRestoreTest { mock(MemoryManager.class), mock(IOManager.class), networkEnvironment, + new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), taskStateManager, mock(TaskManagerActions.class), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 54f4855..8a69fe9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; @@ -66,6 +67,7 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -149,7 +151,6 @@ public class StreamTaskTerminationTest extends TestLogger { TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); final NetworkEnvironment networkEnv = mock(NetworkEnvironment.class); - when(networkEnv.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mock(TaskKvStateRegistry.class)); when(networkEnv.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); BlobCacheService blobService = @@ -168,6 +169,7 @@ public class StreamTaskTerminationTest extends TestLogger { new MemoryManager(32L * 1024L, 1), new IOManagerAsync(), networkEnv, + new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TestTaskStateManager(), mock(TaskManagerActions.class), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 372a9d8..30c460a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -64,7 +64,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStorage; @@ -85,6 +85,7 @@ import org.apache.flink.runtime.state.TaskStateManagerImpl; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; @@ -901,8 +902,6 @@ public class StreamTaskTest extends TestLogger { NetworkEnvironment network = mock(NetworkEnvironment.class); when(network.getResultPartitionManager()).thenReturn(partitionManager); - when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))) - .thenReturn(mock(TaskKvStateRegistry.class)); when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); JobInformation jobInformation = new JobInformation( @@ -934,6 +933,7 @@ public class StreamTaskTest extends TestLogger { mock(MemoryManager.class), mock(IOManager.class), network, + new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), taskStateManager, taskManagerActions, diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index 5c43564..ebfaac5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -54,7 +54,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.state.AbstractSnapshotStrategy; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -71,6 +71,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.state.testutils.BackendForTestStream; +import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; @@ -99,7 +100,6 @@ import java.util.concurrent.RunnableFuture; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -224,10 +224,8 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { TestStreamTask.class.getName(), taskConfig); - TaskKvStateRegistry mockKvRegistry = mock(TaskKvStateRegistry.class); TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher(); NetworkEnvironment network = mock(NetworkEnvironment.class); - when(network.createKvStateTaskRegistry(any(JobID.class), any(JobVertexID.class))).thenReturn(mockKvRegistry); when(network.getTaskEventDispatcher()).thenReturn(taskEventDispatcher); BlobCacheService blobService = @@ -246,6 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { mock(MemoryManager.class), mock(IOManager.class), network, + new KvStateService(new KvStateRegistry(), null, null), mock(BroadcastVariableManager.class), new TestTaskStateManager(), mock(TaskManagerActions.class),