[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273172948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   ok, sorry, I see your point now. Actually, the credit-based mode seems to 
belong to Network but not Netty. Maybe, `NettyConnectionManager` should rather 
take one more constructor argument `isCreditBased` and then we can keep it only 
in `NetworkEnvironmentConfiguration`. We should just make sure that 
`isCreditBased` is `false` for `NetworkEnvironmentConfiguration.nettyConfig = 
null`


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273109022
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273102854
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   In general, I do not see a problem to set default `NettyConfig` with 
enabling credit-based by default in `NetworkEnvironmentConfigurationBuilder`. 
It is just some constant `NettyConfig` or what is problem here? It also looks 
like inconsistent `NetworkEnvironmentConfiguration` with `isCreditBased = true` 
and `NettyConfig = null` (so there is somewhat coupling). I would say 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273102854
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   In general, I do not see a problem to set default `NettyConfig` with 
enabling credit-based by default in `NetworkEnvironmentConfigurationBuilder`. 
It is just some constant `NettyConfig` or what is problem here? It also looks 
like inconsistent `NetworkEnvironmentConfiguration` with `isCreditBased = true` 
and `NettyConfig = null` (so there is somewhat coupling). I would say 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273092433
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numberOfNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* Calculates the amount of memory used for network buffers inside the 
current JVM instance
+* based on the available heap or the max heap size and the according 
configuration parameters.
+*
+* For containers or when started via scripts, if started with a 
memory limit and set to use
+* off-heap memory, the maximum heap size for the JVM is adjusted 
accordingly and we are able
+* to extract the intended values from this.
+*
+* The following configuration parameters are involved:
+* 
+*  {@link TaskManagerOptions#MANAGED_MEMORY_SIZE},
+*  {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and
+*  {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)
+* .
+*
+* @param config configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+*
+* @return memory to use for network buffers (in bytes)
+*/
+   @VisibleForTesting
+   public static long calculateNewNetworkBufferMemory(Configuration 
config, long maxJvmHeapMemory) {
+   // The maximum heap memory has been adjusted as in 
TaskManagerServices#calculateHeapSizeMB
+   // and we need to invert these calculations.
+   final long jvmHeapNoNet;
+   final MemoryType memoryType = 
ConfigurationParserUtils.getMemoryType(config);
+   if (memoryType == MemoryType.HEAP) {
+   jvmHeapNoNet = maxJvmHeapMemory;
+   } else if (memoryType == MemoryType.OFF_HEAP) {
+   long configuredMemory = 
ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to 
bytes
+   if (configuredMemory > 0) {
+   // The maximum heap memory has been adjusted 
according to 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r273090126
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   Agreed, let's create an issue


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272103567
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   We should also try to move all options which are used only for network 
environment from `TaskManagerOptions` to new class `NetworkEnvironmentOptions` 
to separate them for different shuffle services. What do you think? At the end, 
we can also move all network related classes into its dedicated package.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272103567
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
 
 Review comment:
   We should also try to move all options which are used only for network 
environment from `TaskManagerOptions` to new class `NetworkEnvironmentOptions` 
to separate them for different shuffle services. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272102359
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,16 +112,381 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   final int dataport = getDataport(configuration);
+
+   final int pageSize = getPageSize(configuration);
+
+   final int numberOfNetworkBuffers = 
calculateNumberOfNetworkBuffers(configuration, maxJvmHeapMemory);
+
+   final NettyConfig nettyConfig = 
createNettyConfig(configuration, localTaskManagerCommunication, 
taskManagerAddress, dataport);
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numberOfNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* Calculates the amount of memory used for network buffers inside the 
current JVM instance
+* based on the available heap or the max heap size and the according 
configuration parameters.
+*
+* For containers or when started via scripts, if started with a 
memory limit and set to use
+* off-heap memory, the maximum heap size for the JVM is adjusted 
accordingly and we are able
+* to extract the intended values from this.
+*
+* The following configuration parameters are involved:
+* 
+*  {@link TaskManagerOptions#MANAGED_MEMORY_SIZE},
+*  {@link TaskManagerOptions#MANAGED_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_FRACTION},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MIN},
+*  {@link TaskManagerOptions#NETWORK_BUFFERS_MEMORY_MAX}, 
and
+*  {@link TaskManagerOptions#NETWORK_NUM_BUFFERS} (fallback if the 
ones above do not exist)
+* .
+*
+* @param config configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+*
+* @return memory to use for network buffers (in bytes)
+*/
+   @VisibleForTesting
+   public static long calculateNewNetworkBufferMemory(Configuration 
config, long maxJvmHeapMemory) {
+   // The maximum heap memory has been adjusted as in 
TaskManagerServices#calculateHeapSizeMB
+   // and we need to invert these calculations.
+   final long jvmHeapNoNet;
+   final MemoryType memoryType = 
ConfigurationParserUtils.getMemoryType(config);
+   if (memoryType == MemoryType.HEAP) {
+   jvmHeapNoNet = maxJvmHeapMemory;
+   } else if (memoryType == MemoryType.OFF_HEAP) {
+   long configuredMemory = 
ConfigurationParserUtils.getManagedMemorySize(config) << 20; // megabytes to 
bytes
+   if (configuredMemory > 0) {
+   // The maximum heap memory has been adjusted 
according to 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272080191
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   I think so, I created 
[FLINK-12110](https://issues.apache.org/jira/browse/FLINK-12110) to track it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272080191
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   I think so, I created FLINK-12110 to track it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272077394
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   As I understand `NetworkEnvironmentConfigurationBuilder.nettyConfig` is 
`null` by default.
   Then maybe `nettyConfig` could be some created default non-null object and 
`NetworkEnvironmentConfigurationBuilder` could also have 
`setDefaultNettyConfigAndIsCreditBased(boolean isCreditBased)` instead of 
`setIsCreditBased`? Internally it would construct default non-null 
`nettyConfig` with requested 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272066212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   True, then we should probably use `@SuppressWarnings("deprecation")` in this 
case (now it should already go to `calculateNumberOfNetworkBuffers`) because 
the method `fromConfiguration` itself does not look deprecated .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-04 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272066212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   True, then we should probably use `@SuppressWarnings("deprecation")` in this 
case because the method `fromConfiguration` itself does not look deprecated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271823199
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ##
 @@ -616,10 +394,9 @@ public static long calculateHeapSizeMB(long 
totalJavaMemorySizeMB, Configuration
Preconditions.checkArgument(totalJavaMemorySizeMB > 0);
 
// subtract the Java memory used for network buffers (always 
off-heap)
-   final long networkBufMB =
-   calculateNetworkBufferMemory(
-   totalJavaMemorySizeMB << 20, // megabytes to 
bytes
-   config) >> 20; // bytes to megabytes
+   final long networkBufMB = 
NetworkEnvironmentConfiguration.calculateNetworkBufferMemory(
 
 Review comment:
   It looks like a future API point for ShuffleService to estimate the memory 
it needs in container.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271834864
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 ##
 @@ -1045,7 +1045,7 @@ public void testTaskManagerServicesConfiguration() 
throws Exception {

config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
 
TaskManagerServicesConfiguration tmConfig =
-   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLoopbackAddress(), true);
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 128L * 1024 * 1024, 
InetAddress.getLoopbackAddress(), true);
 
 Review comment:
   `private static final long MEM_SIZE_PARAM = 128L * 1024 * 1024;`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271829700
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
 
 Review comment:
   Could we break down this method a bit more?
   I mean adding functions:
   - getDataport()
   - calculateNumberOfNetworkBuffers
   - createNettyConfig


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271453248
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -56,64 +61,25 @@
 
private final TaskEventDispatcher taskEventDispatcher;
 
-   private final int partitionRequestInitialBackoff;
-
-   private final int partitionRequestMaxBackoff;
-
-   /** Number of network buffers to use for each outgoing/incoming channel 
(subpartition/input channel). */
 
 Review comment:
   let's keep these 2 doc comments for `networkBuffersPerChannel` and 
`extraNetworkBuffersPerGate` in `NetworkEnvironmentConfiguration`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271833815
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NetworkBufferCalculationTest.java
 ##
 @@ -39,78 +33,66 @@
 public class NetworkBufferCalculationTest extends TestLogger {
 
/**
-* Test for {@link 
TaskManagerServices#calculateNetworkBufferMemory(TaskManagerServicesConfiguration,
 long)}
-* using the same (manual) test cases as in {@link 
TaskManagerServicesTest#calculateHeapSizeMB()}.
+* Test for {@link 
NetworkEnvironmentConfiguration#calculateNewNetworkBufferMemory(Configuration, 
long)}
+* using the same (manual) test cases as in {@link 
NetworkEnvironmentConfigurationTest#calculateHeapSizeMB()}.
 */
@Test
-   public void calculateNetworkBufFromHeapSize() throws Exception {
-   TaskManagerServicesConfiguration tmConfig;
+   public void calculateNetworkBufFromHeapSize() {
+   Configuration config;
 
-   tmConfig = 
getTmConfig(Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),
+   config = getConfig(
+   
Long.valueOf(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue()),

TaskManagerOptions.MANAGED_MEMORY_FRACTION.defaultValue(),
-   0.1f, 60L << 20, 1L << 30, MemoryType.HEAP);
+   0.1f, 60L << 20, 1L << 30, false);
 
 Review comment:
   MemoryType.HEAP/OFF_HEAP looked a bit more readable.
   could we use `memType  == MemoryType.OFF_HEAP` in `getConfig(.., memType)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271457542
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,300 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
 
 Review comment:
   why is it `@Deprecated`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271820791
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* 

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271831002
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -115,6 +412,7 @@ public int hashCode() {
result = 31 * result + partitionRequestMaxBackoff;
result = 31 * result + networkBuffersPerChannel;
result = 31 * result + floatingNetworkBuffersPerGate;
+   result = 31 * result + (isCreditBased ? 1 : 0);
 
 Review comment:
   also `numNetworkBuffers`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271800591
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   do we need `isCreditBased` 2 times, if it is available in 
`nettyConfig.isCreditBasedEnabled` as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271795236
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java
 ##
 @@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.util.MathUtils;
+
+import static org.apache.flink.configuration.MemorySize.MemoryUnit.MEGA_BYTES;
+import static org.apache.flink.util.MathUtils.checkedDownCast;
+
+/**
+ * Utility class to extract related parameters from {@link Configuration} and 
to
+ * sanity check them.
+ */
+public class ConfigurationParserUtils {
+
+   /**
+* Parses the configuration to get the managed memory size and 
validates the value.
+*
+* @param configuration configuration object
+* @return managed memory size (in megabytes)
+*/
+   public static long getManagedMemorySize(Configuration configuration) {
+   long managedMemorySize;
+   String managedMemorySizeDefaultVal = 
TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
+   if 
(!configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(managedMemorySizeDefaultVal))
 {
+   try {
+   managedMemorySize = MemorySize.parse(
+   
configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), 
MEGA_BYTES).getMebiBytes();
+   } catch (IllegalArgumentException e) {
+   throw new IllegalConfigurationException("Could 
not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
+   }
+   } else {
+   managedMemorySize = 
Long.valueOf(managedMemorySizeDefaultVal);
+   }
+
+   checkConfigParameter(configuration.getString(
+   
TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue())
 || managedMemorySize > 0,
+   managedMemorySize, 
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
+   "MemoryManager needs at least one MB of memory. " +
+   "If you leave this config parameter empty, the 
system automatically pick a fraction of the available memory.");
+
+   return managedMemorySize;
+   }
+
+   /**
+* Parses the configuration to get the fraction of managed memory and 
validates the value.
+*
+* @param configuration configuration object
+* @return fraction of managed memory
+*/
+   public static float getManagedMemoryFraction(Configuration 
configuration) {
+   float managedMemoryFraction = 
configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION);
+
+   checkConfigParameter(managedMemoryFraction > 0.0f && 
managedMemoryFraction < 1.0f, managedMemoryFraction,
+   TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
+   "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
+
+   return managedMemoryFraction;
+   }
+
+   /**
+* Parses the configuration to get the type of memory.
+*
+* @param configuration configuration object
+* @return type of memory
+*/
+   public static MemoryType getMemoryType(Configuration configuration) {
+   // check whether we use heap or off-heap memory
+   final MemoryType memType;
+   if 
(configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
+   memType = MemoryType.OFF_HEAP;
+   } else {
+   

[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-03 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r271827710
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 ##
 @@ -132,27 +118,27 @@ public TaskManagerServicesConfiguration(
//  Getter/Setter
// 

 
-   public InetAddress getTaskManagerAddress() {
+   InetAddress getTaskManagerAddress() {
return taskManagerAddress;
}
 
public String[] getTmpDirPaths() {
return tmpDirPaths;
}
 
-   public String[] getLocalRecoveryStateRootDirectories() {
+   String[] getLocalRecoveryStateRootDirectories() {
return localRecoveryStateRootDirectories;
}
 
-   public boolean isLocalRecoveryEnabled() {
+   boolean isLocalRecoveryEnabled() {
return localRecoveryEnabled;
}
 
public NetworkEnvironmentConfiguration getNetworkConfig() {
return networkConfig;
}
 
-   public QueryableStateConfiguration getQueryableStateConfig() {
+   QueryableStateConfiguration getQueryableStateConfig() {
 
 Review comment:
   let's also annotate with `@Nullable` as private field `queryableStateConfig`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] azagrebin commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-01 Thread GitBox
azagrebin commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r270795077
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 ##
 @@ -678,7 +678,7 @@ public static SingleInputGate create(
 
final SingleInputGate inputGate = new SingleInputGate(
owningTaskName, jobId, consumedResultId, 
consumedPartitionType, consumedSubpartitionIndex,
-   icdd.length, taskActions, metrics, 
networkEnvironment.isCreditBased());
+   icdd.length, taskActions, metrics, 
networkEnvironment.getConfiguration().isCreditBased());
 
 Review comment:
   could we assign `networkEnvironment.getConfiguration()` to a local variable 
to reuse everywhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services