[ https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455740#comment-15455740 ]
ASF GitHub Bot commented on FLINK-4455: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2449#discussion_r77194470 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java --- @@ -18,130 +18,88 @@ package org.apache.flink.runtime.io.network; -import akka.dispatch.OnFailure; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.netty.NettyConfig; -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState; -import org.apache.flink.runtime.messages.TaskMessages.FailTask; -import org.apache.flink.runtime.query.KvStateID; -import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistry; -import org.apache.flink.runtime.query.KvStateRegistryListener; -import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats; import org.apache.flink.runtime.query.netty.KvStateServer; -import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManager; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import static org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers; import static org.apache.flink.util.Preconditions.checkNotNull; /** * Network I/O components of each {@link TaskManager} instance. The network environment contains * the data structures that keep track of all intermediate results and all data exchanges. - * - * When initialized, the NetworkEnvironment will allocate the network buffer pool. - * All other components (netty, intermediate result managers, ...) are only created once the - * environment is "associated" with a TaskManager and JobManager. This happens as soon as the - * TaskManager actor gets created and registers itself at the JobManager. */ public class NetworkEnvironment { private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); private final Object lock = new Object(); - private final NetworkEnvironmentConfiguration configuration; - - private final FiniteDuration jobManagerTimeout; - private final NetworkBufferPool networkBufferPool; - private ConnectionManager connectionManager; + private final ConnectionManager connectionManager; - private ResultPartitionManager partitionManager; + private final ResultPartitionManager resultPartitionManager; - private TaskEventDispatcher taskEventDispatcher; - - private ResultPartitionConsumableNotifier partitionConsumableNotifier; - - private PartitionStateChecker partitionStateChecker; + private final TaskEventDispatcher taskEventDispatcher; /** Server for {@link org.apache.flink.runtime.state.KvState} requests. */ - private KvStateServer kvStateServer; + private final KvStateServer kvStateServer; /** Registry for {@link org.apache.flink.runtime.state.KvState} instances. */ - private KvStateRegistry kvStateRegistry; + private final KvStateRegistry kvStateRegistry; - private boolean isShutdown; + private final IOMode defaultIOMode; - /** - * ExecutionEnvironment which is used to execute remote calls with the - * {@link JobManagerResultPartitionConsumableNotifier} - */ - private final ExecutionContext executionContext; + private final Tuple2<Integer, Integer> partitionRequestInitialAndMaxBackoff; --- End diff -- True will refactor it. > Replace ActorGateways in NetworkEnvironment by interfaces > --------------------------------------------------------- > > Key: FLINK-4455 > URL: https://issues.apache.org/jira/browse/FLINK-4455 > Project: Flink > Issue Type: Improvement > Components: Network, TaskManager > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > The {{NetworkEnvironment}} communicates with the outside world > ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the > dependency on actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement > these interfaces as part of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)