[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-10 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r274236223
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* C

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273318302
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   After thinking through this issue again, if we keep maintaining 
`isCreditBased` in `NetworkEnvironmentConfiguration`, we have to confirm this 
property consistent between `NetworkEnvironmentConfiguration` and Flink 
`Configuration` in `NettyConfig`. So the codes seem ugly in 
`NetworkEnvironmentConfigurationBuilder`. Then atm I would prefer to add this 
property in the constructor of `NettyConnectionMana

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273307816
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I even considered the way of adding one more parameter `isCreditBased` in 
`NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is 
`Deprecated`, and it is supposed to be removed from FLINK-1.6. AFAIK, this 
temporary option would be abandoned finally after one undergoing network issue 
confirmed. So it might be not good to reflect it in the constructor of 
`NettyConnectionManager`  for

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273307816
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I even considered the way of adding one more parameter `isCreditBased` in 
`NettyConnectionManager`. But `TaskManagerOptions.NETWORK_CREDIT_MODEL` is 
`Deprecated`, and it is supposed to be removed from FLINK-1.6. I think this 
temporary option would be abandoned finally after one undergoing network issue 
confirmed. So it might be not good to reflect it in the constructor of 
`NettyConnectionManager`  fo

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273115486
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   Yes, it actually has no problem to set default `NettyConfig` with enabling 
credit-based by default in `NetworkEnvironmentConfigurationBuilder`.
   
   My previous thought is that even though we do not use netty technology in 
network stack, the credit-based mode might also suitable for other network 
frameworks. In other words, the credit-based is more related to 
`ResultPartition` and `SingleInputGate`,

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272944389
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* C

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272919268
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I also do not like the way of two sources of `isCreditBased`. 
   
   I think the key problem is whether `isCreditBased` must exist together with 
`NettyConfig`. My previous thought is trying to make decouple `isCreditBased` 
with `NettyConfig`. 
   
   We know `NettyConfig` is optional currently in 
`NetworkEnvironmentConfiguration`, and most of the existing tests should verify 
the default behavior (`is

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-07 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272894672
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numberOfNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* Calculates the amount of memory used for network buffers inside the 
current JVM instance
+* based on the available heap or the max heap size and the according 
configuration parameters.
+*
+* For containers or when started via scripts, if started with a 
memory limit and set to use
+* off-heap memory, the maximum heap size for the JVM is adjusted 
accordingly and we are able
+* to extract the intended values from this.
+*
+* The following configuration parameters are involved:
+* 
+*  {@link TaskManagerOptions#MANAGED_MEMORY_SIZE},
+*  {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and
+*  {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)
+* .
+*
+* @param config configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+*
+* @return memory to use for network buffers (in bytes)
+*/
+   @VisibleForTesting
+   public static long calculateNewNetworkBufferMemory(Configuration 
config, long maxJvmHeapMemory) {
+   // The maximum heap memory has been adjusted as in 
TaskManagerServices#calculateHeapSizeMB
+   // and we need to invert these calculations.
+   final long jvmHeapNoNet;
+   final MemoryType memoryType = 
ConfigurationParserUtils.getMemoryType(config);
+   if (memoryType == MemoryType.HEAP) {
+   jvmHeapNoNet = maxJvmHeapMemory;
+   } else if (memoryType == MemoryType.OFF_HEAP) {
+   long configuredMemory = 
ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to 
bytes
+   if (configuredMemory > 0) {
+   // The maximum heap memory has been adjusted 
according to configuredMemor

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-07 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272871749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   That is a very good idea. I would create a separate jira for this task later 
or address in this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-07 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272871749
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   That is a very good idea. I would create a separate jira for this task later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272013899
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* C

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272012531
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   Yes, it might be. Then we further focus on it future?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272008282
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MathUtils;
+
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
+
+/**
+ * Utility class to extract related parameters from {@link Configuration} and 
to
+ * sanity check them.
+ */
+public class ConfigurationParserUtils {
+
+   /**
+* Parses the configuration to get the managed memory size and 
validates the value.
+*
+* @param configuration configuration object
+* @return managed memory size (in megabytes)
+*/
+   public static long getManagedMemorySize(Configuration configuration) {
+   long managedMemorySize;
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   try {
+   managedMemorySize = MemorySize.parse(
+   
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), 
MEGA_BYTES).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+   throw new IllegalConfigurationException("Could 
not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+   }
+   } else {
+   managedMemorySize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   }
+
+   checkConfigParameter(configuration.getString(
+   
TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
 || managedMemorySize > 0,
+   managedMemorySize, 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+   "MemoryManager needs at least one MB of memory. " +
+   "If you leave this config parameter empty, the 
system automatically pick a fraction of the available memory.");
+
+   return managedMemorySize;
+   }
+
+   /**
+* Parses the configuration to get the fraction of managed memory and 
validates the value.
+*
+* @param configuration configuration object
+* @return fraction of managed memory
+*/
+   public static float getManagedMemoryFraction(Configuration 
configuration) {
+   float managedMemoryFraction = 
configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+
+   checkConfigParameter(managedMemoryFraction > 0.0f && 
managedMemoryFraction < 1.0f, managedMemoryFraction,
+   TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+   "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
+
+   return managedMemoryFraction;
+   }
+
+   /**
+* Parses the configuration to get the type of memory.
+*
+* @param configuration configuration object
+* @return type of memory
+*/
+   public static MemoryType getMemoryType(Configuration configuration) {
+   // check whether we use heap or off-heap memory
+   final MemoryType memType;
+   if 
(configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+   memType = MemoryType.OFF_HEAP;
+   } else {
+   

[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272006302
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   It is actually for the consideration of tests. 
   We have the `NetworkConfiguraitonBuilder` which is easy for setting 
`isCreditBased` variable in tests, otherwise it needs to construct the 
`NettyConfig` with internal `Configuration` to set this variable. I ever 
considered removing the corresponding `getter` from `NettyConfig` to avoid 
duplication.


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272005759
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   Because the `@Deprecated` `TaskManagerOptions.NETWORK_NUM_BUFFERS` is used 
in this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services