[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r274236223 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +* C
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r273318302 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: After thinking through this issue again, if we keep maintaining `isCreditBased` in `NetworkEnvironmentConfiguration`, we have to confirm this property consistent between `NetworkEnvironmentConfiguration` and Flink `Configuration` in `NettyConfig`. So the codes seem ugly in `NetworkEnvironmentConfigurationBuilder`. Then atm I would prefer to add this property in the constructor of `NettyConnectionMana
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r273307816 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: I even considered the way of adding one more parameter `isCreditBased` in `NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is `Deprecated`, and it is supposed to be removed from FLINK-1.6. AFAIK, this temporary option would be abandoned finally after one undergoing network issue confirmed. So it might be not good to reflect it in the constructor of `NettyConnectionManager` for
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r273307816 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: I even considered the way of adding one more parameter `isCreditBased` in `NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is `Deprecated`, and it is supposed to be removed from FLINK-1.6. I think this temporary option would be abandoned finally after one undergoing network issue confirmed. So it might be not good to reflect it in the constructor of `NettyConnectionManager` fo
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r273115486 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: Yes, it actually has no problem to set default `NettyConfig` with enabling credit-based by default in `NetworkEnvironmentConfigurationBuilder`. My previous thought is that even though we do not use netty technology in network stack, the credit-based mode might also suitable for other network frameworks. In other words, the credit-based is more related to `ResultPartition` and `SingleInputGate`,
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272944389 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +* C
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272919268 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: I also do not like the way of two sources of `isCreditBased`. I think the key problem is whether `isCreditBased` must exist together with `NettyConfig`. My previous thought is trying to make decouple `isCreditBased` with `NettyConfig`. We know `NettyConfig` is optional currently in `NetworkEnvironmentConfiguration`, and most of the existing tests should verify the default behavior (`is
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272894672 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + final int dataport = getDataport(configuration); + + final int pageSize = getPageSize(configuration); + + final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory); + + final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport); + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numberOfNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +* Calculates the amount of memory used for network buffers inside the current JVM instance +* based on the available heap or the max heap size and the according configuration parameters. +* +* For containers or when started via scripts, if started with a memory limit and set to use +* off-heap memory, the maximum heap size for the JVM is adjusted accordingly and we are able +* to extract the intended values from this. +* +* The following configuration parameters are involved: +* +* {@link TaskManagerOptions#MANAGED_MEMORY_SIZE}, +* {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN}, +* {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, and +* {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the ones above do not exist) +* . +* +* @param config configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* +* @return memory to use for network buffers (in bytes) +*/ + @VisibleForTesting + public static long calculateNewNetworkBufferMemory(Configuration config, long maxJvmHeapMemory) { + // The maximum heap memory has been adjusted as in TaskManagerServices#calculateHeapSizeMB + // and we need to invert these calculations. + final long jvmHeapNoNet; + final MemoryType memoryType = ConfigurationParserUtils.getMemoryType(config); + if (memoryType == MemoryType.HEAP) { + jvmHeapNoNet = maxJvmHeapMemory; + } else if (memoryType == MemoryType.OFF_HEAP) { + long configuredMemory = ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to bytes + if (configuredMemory > 0) { + // The maximum heap memory has been adjusted according to configuredMemor
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272871749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + final int dataport = getDataport(configuration); + + final int pageSize = getPageSize(configuration); + + final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory); + + final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport); + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); Review comment: That is a very good idea. I would create a separate jira for this task later or address in this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272871749 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + final int dataport = getDataport(configuration); + + final int pageSize = getPageSize(configuration); + + final int numberOfNetworkBuffers = calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory); + + final NettyConfig nettyConfig = createNettyConfig(configuration, localTaskManagerCommunication, taskManagerAddress, dataport); + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); Review comment: That is a very good idea. I would create a separate jira for this task later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272013899 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); + + return new NetworkEnvironmentConfiguration( + numNetworkBuffers, + pageSize, + initialRequestBackoff, + maxRequestBackoff, + buffersPerChannel, + extraBuffersPerGate, + isCreditBased, + nettyConfig); + } + + /** +* C
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272012531 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java ## @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration Preconditions.checkArgument(totalJavaMemorySizeMB > 0); // subtract the Java memory used for network buffers (always off-heap) - final long networkBufMB = - calculateNetworkBufferMemory( - totalJavaMemorySizeMB << 20, // megabytes to bytes - config) >> 20; // bytes to megabytes + final long networkBufMB = NetworkEnvironmentConfiguration.calculateNetworkBufferMemory( Review comment: Yes, it might be. Then we further focus on it future? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272008282 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java ## @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.util.MathUtils; + +import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES; +import static org.apache.flink.util.MathUtils.checkedDownCast; + +/** + * Utility class to extract related parameters from {@link Configuration} and to + * sanity check them. + */ +public class ConfigurationParserUtils { + + /** +* Parses the configuration to get the managed memory size and validates the value. +* +* @param configuration configuration object +* @return managed memory size (in megabytes) +*/ + public static long getManagedMemorySize(Configuration configuration) { + long managedMemorySize; + String managedMemorySizeDefaultVal = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue(); + if (!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal)) { + try { + managedMemorySize = MemorySize.parse( + configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MEGA_BYTES).getMebiBytes(); + } catch (IllegalArgumentException e) { + throw new IllegalConfigurationException("Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e); + } + } else { + managedMemorySize = Long.valueOf(managedMemorySizeDefaultVal); + } + + checkConfigParameter(configuration.getString( + TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()) || managedMemorySize > 0, + managedMemorySize, TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), + "MemoryManager needs at least one MB of memory. " + + "If you leave this config parameter empty, the system automatically pick a fraction of the available memory."); + + return managedMemorySize; + } + + /** +* Parses the configuration to get the fraction of managed memory and validates the value. +* +* @param configuration configuration object +* @return fraction of managed memory +*/ + public static float getManagedMemoryFraction(Configuration configuration) { + float managedMemoryFraction = configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION); + + checkConfigParameter(managedMemoryFraction > 0.0f && managedMemoryFraction < 1.0f, managedMemoryFraction, + TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(), + "MemoryManager fraction of the free memory must be between 0.0 and 1.0"); + + return managedMemoryFraction; + } + + /** +* Parses the configuration to get the type of memory. +* +* @param configuration configuration object +* @return type of memory +*/ + public static MemoryType getMemoryType(Configuration configuration) { + // check whether we use heap or off-heap memory + final MemoryType memType; + if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) { + memType = MemoryType.OFF_HEAP; + } else { +
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272006302 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( + Configuration configuration, + long maxJvmHeapMemory, + boolean localTaskManagerCommunication, + InetAddress taskManagerAddress) { + + // > hosts / ports for communication and data exchange + + final int dataport = configuration.getInteger(TaskManagerOptions.DATA_PORT); + ConfigurationParserUtils.checkConfigParameter(dataport >= 0, dataport, TaskManagerOptions.DATA_PORT.key(), + "Leave config parameter empty or use 0 to let the system choose a port automatically."); + + final int pageSize = ConfigurationParserUtils.getPageSize(configuration); + + final int numNetworkBuffers; + if (!hasNewNetworkConfig(configuration)) { + // fallback: number of network buffers + numNetworkBuffers = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS); + + checkOldNetworkConfig(numNetworkBuffers); + } else { + if (configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) { + LOG.info("Ignoring old (but still present) network buffer configuration via {}.", + TaskManagerOptions.NETWORK_NUM_BUFFERS.key()); + } + + final long networkMemorySize = calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory); + + // tolerate offcuts between intended and allocated memory due to segmentation (will be available to the user-space memory) + long numNetworkBuffersLong = networkMemorySize / pageSize; + if (numNetworkBuffersLong > Integer.MAX_VALUE) { + throw new IllegalArgumentException("The given number of memory bytes (" + networkMemorySize + + ") corresponds to more than MAX_INT pages."); + } + numNetworkBuffers = (int) numNetworkBuffersLong; + } + + final NettyConfig nettyConfig; + if (!localTaskManagerCommunication) { + final InetSocketAddress taskManagerInetSocketAddress = new InetSocketAddress(taskManagerAddress, dataport); + + nettyConfig = new NettyConfig(taskManagerInetSocketAddress.getAddress(), taskManagerInetSocketAddress.getPort(), + pageSize, ConfigurationParserUtils.getSlot(configuration), configuration); + } else { + nettyConfig = null; + } + + int initialRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL); + int maxRequestBackoff = configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX); + + int buffersPerChannel = configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL); + int extraBuffersPerGate = configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE); + + boolean isCreditBased = configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL); Review comment: It is actually for the consideration of tests. We have the `NetworkConfiguraitonBuilder` which is easy for setting `isCreditBased` variable in tests, otherwise it needs to construct the `NettyConfig` with internal `Configuration` to set this variable. I ever considered removing the corresponding `getter` from `NettyConfig` to avoid duplication.
[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment URL: https://github.com/apache/flink/pull/8090#discussion_r272005759 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java ## @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() { return nettyConfig; } + public boolean isCreditBased() { + return isCreditBased; + } + + // + + /** +* Utility method to extract network related parameters from the configuration and to +* sanity check them. +* +* @param configuration configuration object +* @param maxJvmHeapMemory the maximum JVM heap size (in bytes) +* @param localTaskManagerCommunication true, to skip initializing the network stack +* @param taskManagerAddress identifying the IP address under which the TaskManager will be accessible +* @return NetworkEnvironmentConfiguration +*/ + @Deprecated + public static NetworkEnvironmentConfiguration fromConfiguration( Review comment: Because the `@Deprecated` `TaskManagerOptions.NETWORK_NUM_BUFFERS` is used in this method. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services