zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r273307816
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ########## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // ------------------------------------------------------------------------ + + /** + * Utility method to extract network related parameters from the configuration and to + * sanity check them. + * + * @param configuration configuration object + * @param maxJvmHeapMemory the maximum JVM heap size (in bytes) + * @param localTaskManagerCommunication true, to skip initializing the network stack + * @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible + * @return NetworkEnvironmentConfiguration + */ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // ----> hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: I even considered the way of adding one more parameter `isCreditBased` in `NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is `Deprecated`, and it is supposed to be removed from FLINK-1.6. I think this temporary option would be abandoned finally after one undergoing network issue confirmed. So it might be not good to reflect it in the constructor of `NettyConnectionManager` for avoiding changing repeatedly. `NettyConfig` has not maintained the `isCreditBased` property explicitly, and it just gets this option from internal `Configuration` via `isCreditBasedEnabled` method. So we could remove this method then `NettyConnectionManager` could still get it via `client.getConfig().getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)`. I think this property seems not related to `NettyConfig` in this way. In addition, the current netty stack could support both credit and non-credit modes in two suits of codes. So if `isCreditBased` is false, that does not mean `NettyConfig` is null. `NettyConfig` is null or not is based on whether downstream and upstream are in the same task manager. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services