Repository: flink
Updated Branches:
  refs/heads/master 9fb074c9c -> fb1ef0812


[FLINK-5904] Make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode

This closes #3414.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0da62e8d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0da62e8d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0da62e8d

Branch: refs/heads/master
Commit: 0da62e8d77dceca6f39e70fb6a313a8169364de0
Parents: 9fb074c
Author: WangTaoTheTonic <wangtao...@huawei.com>
Authored: Sat Feb 25 12:19:43 2017 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Apr 19 14:36:45 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/configuration/ConfigConstants.java    |  2 +-
 .../apache/flink/configuration/JobManagerOptions.java  |  5 +++++
 .../apache/flink/configuration/TaskManagerOptions.java |  5 +++++
 .../flink/yarn/AbstractYarnClusterDescriptor.java      | 13 +++++++++++--
 4 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index de06b59..a035beb 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -113,7 +113,7 @@ public final class ConfigConstants {
        public static final String EXECUTION_RETRY_DELAY_KEY = 
"execution-retries.delay";
        
        // -------------------------------- Runtime 
-------------------------------
-       
+
        /**
         * The config parameter defining the network address to connect to
         * for communication with the job manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index d9c9d18..2534ddf 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -57,6 +57,11 @@ public class JobManagerOptions {
                key("jobmanager.rpc.port")
                .defaultValue(6123);
 
+       /** JVM heap size (in megabytes) for the JobManager */
+       public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
+               key("jobmanager.heap.mb")
+               .defaultValue(1024);
+
        /**
         * The maximum number of prior execution attempts kept in history.
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
index adfc8e9..9a00a0f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
@@ -39,6 +39,11 @@ public class TaskManagerOptions {
                        key("taskmanager.jvm-exit-on-oom")
                        .defaultValue(false);
 
+       /** JVM heap size (in megabytes) for the TaskManagers */
+       public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
+                       key("taskmanager.heap.mb")
+                       .defaultValue(1024);
+                  
        /** Size of memory buffers used by the network stack and the memory 
manager (in bytes). */
        public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
                        key("taskmanager.memory.segment-size")

http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b383b59..9cb80aa 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -26,10 +26,12 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -113,9 +115,9 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
         */
        private int slots = -1;
 
-       private int jobManagerMemoryMb = 1024;
+       private int jobManagerMemoryMb = 
JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue();
 
-       private int taskManagerMemoryMb = 1024;
+       private int taskManagerMemoryMb = 
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue();
 
        private int taskManagerCount = 1;
 
@@ -166,6 +168,13 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        flinkConfigurationPath = new 
Path(confFile.getAbsolutePath());
 
                        slots = 
flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+                       
+                       if 
(flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()))
 {
+                               jobManagerMemoryMb = 
flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+                       }
+                       if 
(flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key()))
 {
+                               taskManagerMemoryMb = 
flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+                       }
                } catch (Exception e) {
                        LOG.debug("Config couldn't be loaded from environment 
variable.");
                }

Reply via email to