Repository: flink Updated Branches: refs/heads/master 9fb074c9c -> fb1ef0812
[FLINK-5904] Make jobmanager.heap.mb and taskmanager.heap.mb work in YARN mode This closes #3414. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0da62e8d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0da62e8d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0da62e8d Branch: refs/heads/master Commit: 0da62e8d77dceca6f39e70fb6a313a8169364de0 Parents: 9fb074c Author: WangTaoTheTonic <wangtao...@huawei.com> Authored: Sat Feb 25 12:19:43 2017 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Apr 19 14:36:45 2017 +0200 ---------------------------------------------------------------------- .../apache/flink/configuration/ConfigConstants.java | 2 +- .../apache/flink/configuration/JobManagerOptions.java | 5 +++++ .../apache/flink/configuration/TaskManagerOptions.java | 5 +++++ .../flink/yarn/AbstractYarnClusterDescriptor.java | 13 +++++++++++-- 4 files changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index de06b59..a035beb 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -113,7 +113,7 @@ public final class ConfigConstants { public static final String EXECUTION_RETRY_DELAY_KEY = "execution-retries.delay"; // -------------------------------- Runtime ------------------------------- - + /** * The config parameter defining the network address to connect to * for communication with the job manager. http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index d9c9d18..2534ddf 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -57,6 +57,11 @@ public class JobManagerOptions { key("jobmanager.rpc.port") .defaultValue(6123); + /** JVM heap size (in megabytes) for the JobManager */ + public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY = + key("jobmanager.heap.mb") + .defaultValue(1024); + /** * The maximum number of prior execution attempts kept in history. */ http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java index adfc8e9..9a00a0f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java @@ -39,6 +39,11 @@ public class TaskManagerOptions { key("taskmanager.jvm-exit-on-oom") .defaultValue(false); + /** JVM heap size (in megabytes) for the TaskManagers */ + public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY = + key("taskmanager.heap.mb") + .defaultValue(1024); + /** Size of memory buffers used by the network stack and the memory manager (in bytes). */ public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE = key("taskmanager.memory.segment-size") http://git-wip-us.apache.org/repos/asf/flink/blob/0da62e8d/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index b383b59..9cb80aa 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -26,10 +26,12 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -113,9 +115,9 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor */ private int slots = -1; - private int jobManagerMemoryMb = 1024; + private int jobManagerMemoryMb = JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.defaultValue(); - private int taskManagerMemoryMb = 1024; + private int taskManagerMemoryMb = TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.defaultValue(); private int taskManagerCount = 1; @@ -166,6 +168,13 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor flinkConfigurationPath = new Path(confFile.getAbsolutePath()); slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + + if (flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { + jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); + } + if (flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) { + taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + } } catch (Exception e) { LOG.debug("Config couldn't be loaded from environment variable."); }