[ 
https://issues.apache.org/jira/browse/FLINK-4455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15455740#comment-15455740
 ] 

ASF GitHub Bot commented on FLINK-4455:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2449#discussion_r77194470
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
    @@ -18,130 +18,88 @@
     
     package org.apache.flink.runtime.io.network;
     
    -import akka.dispatch.OnFailure;
     import org.apache.flink.api.common.JobID;
     import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.InstanceConnectionInfo;
     import org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode;
     import 
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
     import org.apache.flink.runtime.io.network.buffer.BufferPool;
     import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
    -import org.apache.flink.runtime.io.network.netty.NettyConfig;
    -import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
    -import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
     import org.apache.flink.runtime.io.network.partition.ResultPartition;
     import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
    -import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
     import 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
     import 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
    -import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
     import org.apache.flink.runtime.jobgraph.JobVertexID;
    -import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
    -import org.apache.flink.runtime.messages.TaskMessages.FailTask;
    -import org.apache.flink.runtime.query.KvStateID;
    -import org.apache.flink.runtime.query.KvStateMessage;
     import org.apache.flink.runtime.query.KvStateRegistry;
    -import org.apache.flink.runtime.query.KvStateRegistryListener;
    -import org.apache.flink.runtime.query.KvStateServerAddress;
     import org.apache.flink.runtime.query.TaskKvStateRegistry;
    -import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
     import org.apache.flink.runtime.query.netty.KvStateServer;
    -import 
org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
     import org.apache.flink.runtime.taskmanager.Task;
     import org.apache.flink.runtime.taskmanager.TaskManager;
    -import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
    -import scala.Option;
     import scala.Tuple2;
    -import scala.concurrent.ExecutionContext;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
     
     import java.io.IOException;
     
    -import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
     import static org.apache.flink.util.Preconditions.checkNotNull;
     
     /**
      * Network I/O components of each {@link TaskManager} instance. The 
network environment contains
      * the data structures that keep track of all intermediate results and all 
data exchanges.
    - *
    - * When initialized, the NetworkEnvironment will allocate the network 
buffer pool.
    - * All other components (netty, intermediate result managers, ...) are 
only created once the
    - * environment is "associated" with a TaskManager and JobManager. This 
happens as soon as the
    - * TaskManager actor gets created and registers itself at the JobManager.
      */
     public class NetworkEnvironment {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(NetworkEnvironment.class);
     
        private final Object lock = new Object();
     
    -   private final NetworkEnvironmentConfiguration configuration;
    -
    -   private final FiniteDuration jobManagerTimeout;
    -
        private final NetworkBufferPool networkBufferPool;
     
    -   private ConnectionManager connectionManager;
    +   private final ConnectionManager connectionManager;
     
    -   private ResultPartitionManager partitionManager;
    +   private final ResultPartitionManager resultPartitionManager;
     
    -   private TaskEventDispatcher taskEventDispatcher;
    -
    -   private ResultPartitionConsumableNotifier partitionConsumableNotifier;
    -
    -   private PartitionStateChecker partitionStateChecker;
    +   private final TaskEventDispatcher taskEventDispatcher;
     
        /** Server for {@link org.apache.flink.runtime.state.KvState} requests. 
*/
    -   private KvStateServer kvStateServer;
    +   private final KvStateServer kvStateServer;
     
        /** Registry for {@link org.apache.flink.runtime.state.KvState} 
instances. */
    -   private KvStateRegistry kvStateRegistry;
    +   private final KvStateRegistry kvStateRegistry;
     
    -   private boolean isShutdown;
    +   private final IOMode defaultIOMode;
     
    -   /**
    -    * ExecutionEnvironment which is used to execute remote calls with the
    -    * {@link JobManagerResultPartitionConsumableNotifier}
    -    */
    -   private final ExecutionContext executionContext;
    +   private final Tuple2<Integer, Integer> 
partitionRequestInitialAndMaxBackoff;
    --- End diff --
    
    True will refactor it.


> Replace ActorGateways in NetworkEnvironment by interfaces
> ---------------------------------------------------------
>
>                 Key: FLINK-4455
>                 URL: https://issues.apache.org/jira/browse/FLINK-4455
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network, TaskManager
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The {{NetworkEnvironment}} communicates with the outside world 
> ({{TaskManager}} and {{JobManager}}) via {{ActorGateways}}. This bakes in the 
> dependency on actors. 
> In terms of modularization and an improved abstraction (especially wrt 
> Flip-6) I propose to replace the {{ActorGateways}} by interfaces which 
> exposes the required methods. The current implementation would then simply 
> wrap the method calls in messages and send them via the {{ActorGateway}} to 
> the recipient.
> In Flip-6 the {{JobMaster}} and the {{TaskExecutor}} could simply implement 
> these interfaces as part of their RPC contract.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to