[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6297


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-17 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r203000441
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() 
throws Exception {
assertThat(clusterSpecification.getSlotsPerTaskManager(), 
is(slotsPerTaskManager));
}
 
+   /**
+* Tests the specifying heap memory for job manager and task manager.
+*/
+   @Test
+   public void testHeapMemoryProperty() throws Exception {
+   //without unit
+   String[] args = new String[] { "-yn", "2", "-yjm", "1024", 
"-ytm", "2048" };
+
+   FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
+   new Configuration(),
+   tmp.getRoot().getAbsolutePath(),
+   "y",
+   "yarn");
+
+   CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+   ClusterSpecification clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit "m"
--- End diff --

This should be a separate test


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-17 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r203000456
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() 
throws Exception {
assertThat(clusterSpecification.getSlotsPerTaskManager(), 
is(slotsPerTaskManager));
}
 
+   /**
+* Tests the specifying heap memory for job manager and task manager.
+*/
+   @Test
+   public void testHeapMemoryProperty() throws Exception {
+   //without unit
+   String[] args = new String[] { "-yn", "2", "-yjm", "1024", 
"-ytm", "2048" };
+
+   FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
+   new Configuration(),
+   tmp.getRoot().getAbsolutePath(),
+   "y",
+   "yarn");
+
+   CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+   ClusterSpecification clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit "m"
+   args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", 
"2048m" };
+   commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, 
false);
+   clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit non "m"
--- End diff --

This should be a separate test


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-17 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r203000672
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() 
throws Exception {
assertThat(clusterSpecification.getSlotsPerTaskManager(), 
is(slotsPerTaskManager));
}
 
+   /**
+* Tests the specifying heap memory for job manager and task manager.
+*/
+   @Test
+   public void testHeapMemoryProperty() throws Exception {
+   //without unit
+   String[] args = new String[] { "-yn", "2", "-yjm", "1024", 
"-ytm", "2048" };
+
+   FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
+   new Configuration(),
+   tmp.getRoot().getAbsolutePath(),
+   "y",
+   "yarn");
+
+   CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+   ClusterSpecification clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit "m"
+   args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", 
"2048m" };
+   commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, 
false);
+   clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit non "m"
+   args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" };
+   commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, 
false);
+   clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //specify old config key
--- End diff --

This should be a separate test


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-17 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r203000806
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---
@@ -352,6 +352,76 @@ public void testConfigurationClusterSpecification() 
throws Exception {
assertThat(clusterSpecification.getSlotsPerTaskManager(), 
is(slotsPerTaskManager));
}
 
+   /**
+* Tests the specifying heap memory for job manager and task manager.
+*/
+   @Test
+   public void testHeapMemoryProperty() throws Exception {
+   //without unit
+   String[] args = new String[] { "-yn", "2", "-yjm", "1024", 
"-ytm", "2048" };
+
+   FlinkYarnSessionCli flinkYarnSessionCli = new 
FlinkYarnSessionCli(
+   new Configuration(),
+   tmp.getRoot().getAbsolutePath(),
+   "y",
+   "yarn");
+
+   CommandLine commandLine = 
flinkYarnSessionCli.parseCommandLineOptions(args, false);
+
+   ClusterSpecification clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit "m"
+   args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", 
"2048m" };
+   commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, 
false);
+   clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //with unit non "m"
+   args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" };
+   commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, 
false);
+   clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(2048));
+
+   //specify old config key
+   Configuration configuration = new Configuration();
+   
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
+   
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
+
+   flinkYarnSessionCli = new FlinkYarnSessionCli(
+   configuration,
+   tmp.getRoot().getAbsolutePath(),
+   "y",
+   "yarn");
+
+   commandLine = flinkYarnSessionCli.parseCommandLineOptions(new 
String[0], false);
+
+   clusterSpecification = 
flinkYarnSessionCli.getClusterSpecification(commandLine);
+
+   assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
+   assertThat(clusterSpecification.getTaskManagerMemoryMB(), 
is(4096));
+
+   //set nothing use default value
--- End diff --

This should be a separate test


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-16 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202623960
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -500,11 +500,19 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
}
 
if (commandLine.hasOption(jmMemory.getOpt())) {
-   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()));
+   String jmMemoryVal = 
commandLine.getOptionValue(jmMemory.getOpt());
+   if (!jmMemoryVal.toLowerCase().contains("m")) {
--- End diff --

This won't cover other units than "m". How about checking it with 
`MemoryUnit.hasUnit`?


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-16 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202624004
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -500,11 +500,19 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
}
 
if (commandLine.hasOption(jmMemory.getOpt())) {
-   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()));
+   String jmMemoryVal = 
commandLine.getOptionValue(jmMemory.getOpt());
+   if (!jmMemoryVal.toLowerCase().contains("m")) {
+   jmMemoryVal += "m";
+   }
+   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
jmMemoryVal);
}
 
if (commandLine.hasOption(tmMemory.getOpt())) {
-   
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(tmMemory.getOpt()));
+   String tmMemoryVal = 
commandLine.getOptionValue(tmMemory.getOpt());
+   if (!tmMemoryVal.toLowerCase().contains("m")) {
--- End diff --

Same as above


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-13 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202381490
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -500,11 +501,11 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
}
 
if (commandLine.hasOption(jmMemory.getOpt())) {
-   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()));
+   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()) + "m");
--- End diff --

you are right, I have update the PR, please review, thanks.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202338957
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -500,11 +501,11 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
}
 
if (commandLine.hasOption(jmMemory.getOpt())) {
-   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()));
+   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()) + "m");
--- End diff --

I think we have to be a bit smarter here. The unit might be already 
provided from cli, right?


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202338312
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java 
---
@@ -29,6 +29,46 @@
 
private static final String[] EMPTY = new String[0];
 
+   /**
+* Get job manager's heap memory.
+*
+* This method will check the new key {@link 
JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
+* the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for 
backwards compatibility.
+*
+* @param configuration the configuration object
+* @return the memory size of job manager's heap memory.
+*/
+   public static MemorySize getJobManagerHeapMemory(Configuration 
configuration) {
+   if 
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
+   return 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
+   } else if 
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) 
{
+   return 
MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB)
 + "m");
+   } else {
+   throw new RuntimeException("Can not find config key : " 
+ JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()
+   + " or " + 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB);
+   }
+   }
+
+   /**
+* Get task manager's heap memory.
+*
+* This method will check the new key {@link 
TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY} and
+* the old key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY_MB} 
for backwards compatibility.
+*
+* @param configuration the configuration object
+* @return the memory size of task manager's heap memory.
+*/
+   public static MemorySize getTaskManagerHeapMemory(Configuration 
configuration) {
+   if 
(configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) {
+   return 
MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
+   } else if 
(configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB.key()))
 {
+   return 
MemorySize.parse(configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB)
 + "m");
+   } else {
+   throw new RuntimeException("Can not find config key : " 
+ TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key()
--- End diff --

Use `FlinkRuntimeException`.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202339002
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -500,11 +501,11 @@ protected Configuration 
applyCommandLineOptionsToConfiguration(CommandLine comma
}
 
if (commandLine.hasOption(jmMemory.getOpt())) {
-   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()));
+   
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(jmMemory.getOpt()) + "m");
}
 
if (commandLine.hasOption(tmMemory.getOpt())) {
-   
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(tmMemory.getOpt()));
+   
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, 
commandLine.getOptionValue(tmMemory.getOpt()) + "m");
--- End diff --

Same as above.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-13 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202338296
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java 
---
@@ -29,6 +29,46 @@
 
private static final String[] EMPTY = new String[0];
 
+   /**
+* Get job manager's heap memory.
+*
+* This method will check the new key {@link 
JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
+* the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for 
backwards compatibility.
+*
+* @param configuration the configuration object
+* @return the memory size of job manager's heap memory.
+*/
+   public static MemorySize getJobManagerHeapMemory(Configuration 
configuration) {
+   if 
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
+   return 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
+   } else if 
(configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) 
{
+   return 
MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB)
 + "m");
+   } else {
+   throw new RuntimeException("Can not find config key : " 
+ JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()
--- End diff --

Use `FlinkRuntimeException`.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202085772
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

I think we cannot do this as users might not use the `config.sh`. At the 
same time we think we really should keep backwards compatibility, as otherwise 
there might be complaints.  We've discussed this with @GJL offline and we think 
the way to go right now would be to check everywhere we use the new option to 
check for the old one as well if the new one was not set.

That is:
* 
`org.apache.flink.client.deployment.ClusterSpecification#fromConfiguration`
* `org.apache.flink.yarn.YarnResourceManager#YarnResourceManager`
* `org.apache.flink.yarn.cli.FlinkYarnSessionCli#createClusterSpecification`

Probably we should wrap it in some utility method.

Also for the yarn command line we should keep the old behaviour. This means 
we should add "m" suffix in 
`org.apache.flink.yarn.cli.FlinkYarnSessionCli#applyCommandLineOptionsToConfiguration`


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202075216
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

introduce a new config key is used to make the `jobmanager.heap.mb` 
backwards compatibility in flink config file (config.sh can calculate it 
accurately). And user can specify the unit for the value of the key 
`jobmanager.heap.size` .

So if we remove anything about `JOB_MANAGER_HEAP_MEMORY_MB ` in Java and 
Scala code, is there any problem?


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202071052
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

to @GJL we can not use `.withDeprecatedKeys("jobmanager.heap.mb")` because 
`jobmanager.heap.size` and `jobmanager.heap.mb` has different meaning. The 
former can use different unit such **1g** but the latter can just measure with 
**MB**.

to @dawidwys and @GJL , now the `jobmanager.heap.mb` just used in config 
file, and can be calculated accurately, this is used for backwards 
compatibility, but in the project, it is useless, all the place can be replaced 
with `jobmanager.heap.size`, and the key in the code could not been exposed to 
the user?


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202059575
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

Imagine a case when previously we had in configuration:

jobmanager.heap.mb: 1024

which meant 1024 MB, in the new code we will pass 1024 as value for 
`jobmanager.heap.size`, which was introduced solely to treat numbers without 
units as bytes. Therefore it will be parsed as 1024 bytes rather than 1024 MB.

If we switch the default unit to MB, there is no point for having a new 
`ConfigOption`.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202057663
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

@dawidwys Why is it not enough?

> When obtaining a value from the configuration via 
Configuration.getValue(ConfigOption), the deprecated keys will be checked in 
the order provided to this method. The first key for which a value is found 
will be used - that value will be returned.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202023148
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

Agreed, missed it previously. Unfortunately `withDeprecatedKeys` won't 
solve it though. Either we should revert the option (and use the mb versions), 
or anywhere we use the new option, we should also check for the old one. 


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202020619
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

@dawidwys As I understand, backwards compatibility should be prioritized. 
See https://github.com/apache/flink/pull/5448#issuecomment-394296226

> We need to add an additional MemoryUnit.parse() method that takes the 
"default" unit, so that we parse the old heap sizes such that they are in MB if 
nothing else is specified.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202018873
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

I think the problem here would be that we would need to get back to MB as a 
default unit for this option. Or am I wrong here? The whole idea for 
introducing the new option was to switch to bytes as a default one, if none 
provided. 


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r202017985
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

Would it hurt to add `.withDeprecatedKeys("jobmanager.heap.mb")`? In case 
someone does not use the scripts to start the cluster.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201997596
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -101,12 +101,12 @@ Usage:
Optional
  -D Dynamic properties
  -d,--detached   Start detached
- -jm,--jobManagerMemory Memory for JobManager Container [in 
MB]
+ -jm,--jobManagerMemory Memory for JobManager Container [with 
unit, if not, use MB]
--- End diff --

change this soon


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201997464
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

in FLINK-6469, in order to config the jm's memory with unit I introduced a 
new key and deprecated `jobmanager.heap.mb`.

* in flink codebase(except shell script) I have removed all the place used 
`JOB_MANAGER_HEAP_MEMORY_MB` and `jobmanager.heap.mb`, so it will not be used.
* in shell (`config.sh`) the old key `jobmanager.heap.mb` also be supported 
if the new key `jobmanager.heap.size` can not be read, so it still be supported.

 


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201980407
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -386,10 +386,10 @@ private ClusterSpecification 
createClusterSpecification(Configuration configurat
}
 
// JobManager Memory
-   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
+   final int jobManagerMemoryMB = 
MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY),
 MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
--- End diff --

Was there a reason against making `JOB_MANAGER_HEAP_MEMORY_MB` a deprecated 
key for `JOB_MANAGER_HEAP_MEMORY` when you worked on FLINK-6469? That is:
```
public static final ConfigOption JOB_MANAGER_HEAP_MEMORY =
key("jobmanager.heap.size")
.defaultValue("1024m")
.withDeprecatedKeys("jobmanager.heap.mb")
...
```
I am not sure what happens if someone only configures `jobmanager.heap.mb` 
in `flink-conf.yaml`. Will the value be respected for all deployment modes?

Same holds for the TaskManager config key.


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-12 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/6297#discussion_r201981340
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -101,12 +101,12 @@ Usage:
Optional
  -D Dynamic properties
  -d,--detached   Start detached
- -jm,--jobManagerMemory Memory for JobManager Container [in 
MB]
+ -jm,--jobManagerMemory Memory for JobManager Container [with 
unit, if not, use MB]
--- End diff --

nit: I would reword this to: 
> *Memory for JobManager container with optional unit (default: MB).*


---


[GitHub] flink pull request #6297: [FLINK-9777] YARN: JM and TM Memory must be specif...

2018-07-10 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6297

[FLINK-9777] YARN: JM and TM Memory must be specified with Units

## What is the purpose of the change

*This pull request specify unit for JM and TM memory on YARN mode*


## Brief change log

  - *parse the jm and tm with default MB unit*
  - *change related document*

## Verifying this change

This change is already covered by existing tests*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9777

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6297


commit 0ba28e996e5dc01040b9dd4cc9d3d86f6cb9dacd
Author: yanghua 
Date:   2018-07-10T15:16:12Z

[FLINK-9777] YARN: JM and TM Memory must be specified with Units




---