azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce 
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285342718
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##########
 @@ -88,35 +86,62 @@
 
        private final TaskEventPublisher taskEventPublisher;
 
-       private final IOManager ioManager;
+       private final ResultPartitionFactory resultPartitionFactory;
+
+       private final SingleInputGateFactory singleInputGateFactory;
 
        private boolean isShutdown;
 
-       public NetworkEnvironment(
-                       NetworkEnvironmentConfiguration config,
-                       TaskEventPublisher taskEventPublisher,
-                       MetricGroup metricGroup,
-                       IOManager ioManager) {
-               this.config = checkNotNull(config);
+       private NetworkEnvironment(
+               NetworkEnvironmentConfiguration config,
+               NetworkBufferPool networkBufferPool,
+               ConnectionManager connectionManager,
+               ResultPartitionManager resultPartitionManager,
+               TaskEventPublisher taskEventPublisher,
+               ResultPartitionFactory resultPartitionFactory,
+               SingleInputGateFactory singleInputGateFactory) {
+
+               this.config = config;
+               this.networkBufferPool = networkBufferPool;
+               this.connectionManager = connectionManager;
+               this.resultPartitionManager = resultPartitionManager;
+               this.taskEventPublisher = taskEventPublisher;
+               this.resultPartitionFactory = resultPartitionFactory;
+               this.singleInputGateFactory = singleInputGateFactory;
+               this.isShutdown = false;
+       }
 
-               this.networkBufferPool = new 
NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+       public static NetworkEnvironment create(
+               NetworkEnvironmentConfiguration config,
+               TaskEventPublisher taskEventPublisher,
+               MetricGroup metricGroup,
+               IOManager ioManager) {
 
-               NettyConfig nettyConfig = config.nettyConfig();
-               if (nettyConfig != null) {
-                       this.connectionManager = new 
NettyConnectionManager(nettyConfig, config.isCreditBased());
-               } else {
-                       this.connectionManager = new LocalConnectionManager();
-               }
+               NettyConfig nettyConfig = checkNotNull(config).nettyConfig();
+               ConnectionManager connectionManager = nettyConfig != null ?
+                       new NettyConnectionManager(nettyConfig, 
config.isCreditBased()) : new LocalConnectionManager();
 
-               this.resultPartitionManager = new ResultPartitionManager();
-
-               this.taskEventPublisher = checkNotNull(taskEventPublisher);
+               NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+                       config.numNetworkBuffers(), config.networkBufferSize(), 
config.networkBuffersPerChannel());
 
                registerNetworkMetrics(metricGroup, networkBufferPool);
 
-               this.ioManager = checkNotNull(ioManager);
-
-               isShutdown = false;
+               ResultPartitionManager resultPartitionManager = new 
ResultPartitionManager();
+               ResultPartitionFactory resultPartitionFactory = new 
ResultPartitionFactory(
+                       resultPartitionManager, checkNotNull(ioManager), 
networkBufferPool,
+                       config.networkBuffersPerChannel(), 
config.floatingNetworkBuffersPerGate());
 
 Review comment:
   In general, I would prefer to keep classes/functions to have as least 
dependencies as possible if full `config` is not needed, it is easier to see 
the real dependencies and use in other places, like avoid mocking full config 
in tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to