azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce partition/gate setup to decouple task registration with NetworkEnvironment URL: https://github.com/apache/flink/pull/8416#discussion_r285342718
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ########## @@ -88,35 +86,62 @@ private final TaskEventPublisher taskEventPublisher; - private final IOManager ioManager; + private final ResultPartitionFactory resultPartitionFactory; + + private final SingleInputGateFactory singleInputGateFactory; private boolean isShutdown; - public NetworkEnvironment( - NetworkEnvironmentConfiguration config, - TaskEventPublisher taskEventPublisher, - MetricGroup metricGroup, - IOManager ioManager) { - this.config = checkNotNull(config); + private NetworkEnvironment( + NetworkEnvironmentConfiguration config, + NetworkBufferPool networkBufferPool, + ConnectionManager connectionManager, + ResultPartitionManager resultPartitionManager, + TaskEventPublisher taskEventPublisher, + ResultPartitionFactory resultPartitionFactory, + SingleInputGateFactory singleInputGateFactory) { + + this.config = config; + this.networkBufferPool = networkBufferPool; + this.connectionManager = connectionManager; + this.resultPartitionManager = resultPartitionManager; + this.taskEventPublisher = taskEventPublisher; + this.resultPartitionFactory = resultPartitionFactory; + this.singleInputGateFactory = singleInputGateFactory; + this.isShutdown = false; + } - this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + public static NetworkEnvironment create( + NetworkEnvironmentConfiguration config, + TaskEventPublisher taskEventPublisher, + MetricGroup metricGroup, + IOManager ioManager) { - NettyConfig nettyConfig = config.nettyConfig(); - if (nettyConfig != null) { - this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased()); - } else { - this.connectionManager = new LocalConnectionManager(); - } + NettyConfig nettyConfig = checkNotNull(config).nettyConfig(); + ConnectionManager connectionManager = nettyConfig != null ? + new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager(); - this.resultPartitionManager = new ResultPartitionManager(); - - this.taskEventPublisher = checkNotNull(taskEventPublisher); + NetworkBufferPool networkBufferPool = new NetworkBufferPool( + config.numNetworkBuffers(), config.networkBufferSize(), config.networkBuffersPerChannel()); registerNetworkMetrics(metricGroup, networkBufferPool); - this.ioManager = checkNotNull(ioManager); - - isShutdown = false; + ResultPartitionManager resultPartitionManager = new ResultPartitionManager(); + ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory( + resultPartitionManager, checkNotNull(ioManager), networkBufferPool, + config.networkBuffersPerChannel(), config.floatingNetworkBuffersPerGate()); Review comment: In general, I would prefer to keep classes/functions to have as least dependencies as possible if full `config` is not needed, it is easier to see the real dependencies and use in other places, like avoid mocking full config in tests. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services