azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272102359
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##########
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
                return nettyConfig;
        }
 
+       public boolean isCreditBased() {
+               return isCreditBased;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Utility method to extract network related parameters from the 
configuration and to
+        * sanity check them.
+        *
+        * @param configuration configuration object
+        * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+        * @param localTaskManagerCommunication true, to skip initializing the 
network stack
+        * @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+        * @return NetworkEnvironmentConfiguration
+        */
+       @Deprecated
+       public static NetworkEnvironmentConfiguration fromConfiguration(
+               Configuration configuration,
+               long maxJvmHeapMemory,
+               boolean localTaskManagerCommunication,
+               InetAddress taskManagerAddress) {
+
+               final int dataport = getDataport(configuration);
+
+               final int pageSize = getPageSize(configuration);
+
+               final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+               final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+               int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+               int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+               int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+               int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+               boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+               return new NetworkEnvironmentConfiguration(
+                       numberOfNetworkBuffers,
+                       pageSize,
+                       initialRequestBackoff,
+                       maxRequestBackoff,
+                       buffersPerChannel,
+                       extraBuffersPerGate,
+                       isCreditBased,
+                       nettyConfig);
+       }
+
+       /**
+        * Calculates the amount of memory used for network buffers inside the 
current JVM instance
+        * based on the available heap or the max heap size and the according 
configuration parameters.
+        *
+        * <p>For containers or when started via scripts, if started with a 
memory limit and set to use
+        * off-heap memory, the maximum heap size for the JVM is adjusted 
accordingly and we are able
+        * to extract the intended values from this.
+        *
+        * <p>The following configuration parameters are involved:
+        * <ul>
+        *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_SIZE},</li>
+        *  <li>{@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},</li>
+        *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+        *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+        *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and</li>
+        *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)</li>
+        * </ul>.
+        *
+        * @param config configuration object
+        * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+        *
+        * @return memory to use for network buffers (in bytes)
+        */
+       @VisibleForTesting
+       public static long calculateNewNetworkBufferMemory(Configuration 
config, long maxJvmHeapMemory) {
+               // The maximum heap memory has been adjusted as in 
TaskManagerServices#calculateHeapSizeMB
+               // and we need to invert these calculations.
+               final long jvmHeapNoNet;
+               final MemoryType memoryType = 
ConfigurationParserUtils.getMemoryType(config);
+               if (memoryType == MemoryType.HEAP) {
+                       jvmHeapNoNet = maxJvmHeapMemory;
+               } else if (memoryType == MemoryType.OFF_HEAP) {
+                       long configuredMemory = 
ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to 
bytes
+                       if (configuredMemory > 0) {
+                               // The maximum heap memory has been adjusted 
according to configuredMemory, i.e.
+                               // maxJvmHeap = jvmHeapNoNet - configuredMemory
+                               jvmHeapNoNet = maxJvmHeapMemory + 
configuredMemory;
+                       } else {
+                               // The maximum heap memory has been adjusted 
according to the fraction, i.e.
+                               // maxJvmHeap = jvmHeapNoNet - jvmHeapNoNet * 
managedFraction = jvmHeapNoNet * (1 - managedFraction)
+                               jvmHeapNoNet = (long) (maxJvmHeapMemory / (1.0 
- ConfigurationParserUtils.getManagedMemoryFraction(config)));
+                       }
+               } else {
+                       throw new RuntimeException("No supported memory type 
detected.");
+               }
+
+               // finally extract the network buffer memory size again from:
+               // jvmHeapNoNet = jvmHeap - networkBufBytes
+               //              = jvmHeap - Math.min(networkBufMax, 
Math.max(networkBufMin, jvmHeap * netFraction)
+               // jvmHeap = jvmHeapNoNet / (1.0 - networkBufFraction)
+               float networkBufFraction = 
config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+               long networkBufSize = (long) (jvmHeapNoNet / (1.0 - 
networkBufFraction) * networkBufFraction);
+               return calculateNewNetworkBufferMemory(config, networkBufSize, 
maxJvmHeapMemory);
+       }
+
+       /**
+        * Calculates the amount of memory used for network buffers based on 
the total memory to use and
+        * the according configuration parameters.
+        *
+        * <p>The following configuration parameters are involved:
+        * <ul>
+        *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+        *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+        *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and</li>
+        *  <li>{@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)</li>
+        * </ul>.
+        *
+        * @param totalJavaMemorySize overall available memory to use (in bytes)
+        * @param config configuration object
+        *
+        * @return memory to use for network buffers (in bytes)
+        */
+       @SuppressWarnings("deprecation")
+       public static long calculateNetworkBufferMemory(long 
totalJavaMemorySize, Configuration config) {
+               final int segmentSize = getPageSize(config);
+
+               final long networkBufBytes;
+               if (hasNewNetworkConfig(config)) {
+                       float networkBufFraction = 
config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+                       long networkBufSize = (long) (totalJavaMemorySize * 
networkBufFraction);
+                       networkBufBytes = 
calculateNewNetworkBufferMemory(config, networkBufSize, totalJavaMemorySize);
+               } else {
+                       // use old (deprecated) network buffers parameter
+                       int numNetworkBuffers = 
config.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+                       networkBufBytes = (long) numNetworkBuffers * (long) 
segmentSize;
+
+                       checkOldNetworkConfig(numNetworkBuffers);
+
+                       
ConfigurationParserUtils.checkConfigParameter(networkBufBytes < 
totalJavaMemorySize,
+                               networkBufBytes, 
TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+                               "Network buffer memory size too large: " + 
networkBufBytes + " >= " +
+                                       totalJavaMemorySize + " (total JVM 
memory size)");
+                       
ConfigurationParserUtils.checkConfigParameter(networkBufBytes >= segmentSize,
+                               networkBufBytes, 
TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+                               "Network buffer memory size too small: " + 
networkBufBytes + " < " +
+                                       segmentSize + " (" + 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
+               }
+
+               return networkBufBytes;
+       }
+
+       /**
+        * Calculates the amount of memory used for network buffers based on 
the total memory to use and
+        * the according configuration parameters.
+        *
+        * <p>The following configuration parameters are involved:
+        * <ul>
+        *  <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},</li>
+        *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},</li>
+        *      <li>{@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}</li>
+        * </ul>.
+        *
+        * @param config configuration object
+        * @param networkBufSize memory of network buffers based on JVM memory 
size and network fraction
+        * @param maxJvmHeapMemory maximum memory used for checking the results 
of network memory
+        *
+        * @return memory to use for network buffers (in bytes)
+        */
+       private static long calculateNewNetworkBufferMemory(Configuration 
config, long networkBufSize, long maxJvmHeapMemory) {
+               float networkBufFraction = 
config.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
+               long networkBufMin = 
MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
+               long networkBufMax = 
MemorySize.parse(config.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
+
+               int pageSize = getPageSize(config);
+
+               checkNewNetworkConfig(pageSize, networkBufFraction, 
networkBufMin, networkBufMax);
+
+               long networkBufBytes = Math.min(networkBufMax, 
Math.max(networkBufMin, networkBufSize));
+
+               ConfigurationParserUtils.checkConfigParameter(networkBufBytes < 
maxJvmHeapMemory,
+                       "(" + networkBufFraction + ", " + networkBufMin + ", " 
+ networkBufMax + ")",
+                       "(" + 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+                       "Network buffer memory size too large: " + 
networkBufBytes + " >= " +
+                               maxJvmHeapMemory + " (maximum JVM memory 
size)");
+               ConfigurationParserUtils.checkConfigParameter(networkBufBytes 
>= pageSize,
+                       "(" + networkBufFraction + ", " + networkBufMin + ", " 
+ networkBufMax + ")",
+                       "(" + 
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " +
+                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " +
+                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")",
+                       "Network buffer memory size too small: " + 
networkBufBytes + " < " +
+                               pageSize + " (" + 
TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
+
+               return networkBufBytes;
+       }
+
+       /**
+        * Validates the (old) network buffer configuration.
+        *
+        * @param numNetworkBuffers     number of buffers used in the network 
stack
+        *
+        * @throws IllegalConfigurationException if the condition does not hold
+        */
+       @SuppressWarnings("deprecation")
+       private static void checkOldNetworkConfig(final int numNetworkBuffers) {
+               ConfigurationParserUtils.checkConfigParameter(numNetworkBuffers 
> 0, numNetworkBuffers,
+                       TaskManagerOptions.NETWORK_NUM_BUFFERS.key(),
+                       "Must have at least one network buffer");
+       }
+
+       /**
+        * Validates the (new) network buffer configuration.
+        *
+        * @param pageSize                              size of memory buffers
+        * @param networkBufFraction    fraction of JVM memory to use for 
network buffers
+        * @param networkBufMin                 minimum memory size for network 
buffers (in bytes)
+        * @param networkBufMax                 maximum memory size for network 
buffers (in bytes)
+        *
+        * @throws IllegalConfigurationException if the condition does not hold
+        */
+       private static void checkNewNetworkConfig(
+               final int pageSize,
+               final float networkBufFraction,
+               final long networkBufMin,
+               final long networkBufMax) throws IllegalConfigurationException {
+
+               
ConfigurationParserUtils.checkConfigParameter(networkBufFraction > 0.0f && 
networkBufFraction < 1.0f, networkBufFraction,
+                       
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key(),
+                       "Network buffer memory fraction of the free memory must 
be between 0.0 and 1.0");
+
+               ConfigurationParserUtils.checkConfigParameter(networkBufMin >= 
pageSize, networkBufMin,
+                       TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key(),
+                       "Minimum memory for network buffers must allow at least 
one network " +
+                               "buffer with respect to the memory segment 
size");
+
+               ConfigurationParserUtils.checkConfigParameter(networkBufMax >= 
pageSize, networkBufMax,
+                       TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+                       "Maximum memory for network buffers must allow at least 
one network " +
+                               "buffer with respect to the memory segment 
size");
+
+               ConfigurationParserUtils.checkConfigParameter(networkBufMax >= 
networkBufMin, networkBufMax,
+                       TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key(),
+                       "Maximum memory for network buffers must not be smaller 
than minimum memory (" +
+                               
TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ": " + networkBufMin + 
")");
+       }
+
+       /**
+        * Returns whether the new network buffer memory configuration is 
present in the configuration
+        * object, i.e. at least one new parameter is given or the old one is 
not present.
+        *
+        * @param config configuration object
+        * @return <tt>true</tt> if the new configuration method is used, 
<tt>false</tt> otherwise
+        */
+       @SuppressWarnings("deprecation")
+       @VisibleForTesting
+       public static boolean hasNewNetworkConfig(final Configuration config) {
+               return 
config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION) ||
+                       
config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN) ||
+                       
config.contains(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX) ||
+                       
!config.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+       }
+
+       /**
+        * Parses the hosts / ports for communication and data exchange from 
configuration.
+        *
+        * @param configuration configuration object
+        * @return the data port
+        */
+       private static int getDataport(Configuration configuration) {
+               final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+               ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+                       "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+               return dataport;
+       }
+
+       /**
+        * Calculates the number of network buffers based on configuration and 
jvm heap size.
+        *
+        * @param configuration configuration object
+        * @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+        * @return the number of network buffers
+        */
+       private static int calculateNumberOfNetworkBuffers(Configuration 
configuration, long maxJvmHeapMemory) {
+               final int numberOfNetworkBuffers;
+               if (!hasNewNetworkConfig(configuration)) {
+                       // fallback: number of network buffers
+                       numberOfNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+                       checkOldNetworkConfig(numberOfNetworkBuffers);
+               } else {
+                       if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+                               LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+                                       
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+                       }
+
+                       final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+                       // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+                       long numberOfNetworkBuffersLong = networkMemorySize / 
getPageSize(configuration);
+                       if (numberOfNetworkBuffersLong > Integer.MAX_VALUE) {
+                               throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+                                       + ") corresponds to more than MAX_INT 
pages.");
+                       }
+                       numberOfNetworkBuffers = (int) 
numberOfNetworkBuffersLong;
+               }
+
+               return numberOfNetworkBuffers;
+       }
+
+       /**
+        * Parses the configuration to wrapper related components into netty 
configuration which might be null if
+        * communication is in the same task manager.
 
 Review comment:
   This is a bit unclear, maybe just:
   `Parses Netty configuration from Flink {@link Configuration} which is null 
if communication is in the same task manager.`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to