[FLINK-5904] Use proper ConfigOption syntax to retrieve config values
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb1ef081 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb1ef081 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb1ef081 Branch: refs/heads/master Commit: fb1ef0812d002ae9cd6a7eb63e849b5e55f44a84 Parents: 0da62e8 Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Apr 18 17:24:29 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Apr 19 14:37:05 2017 +0200 ---------------------------------------------------------------------- .../flink/configuration/JobManagerOptions.java | 4 +++- .../flink/yarn/AbstractYarnClusterDescriptor.java | 15 ++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fb1ef081/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 2534ddf..d129405 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -57,7 +57,9 @@ public class JobManagerOptions { key("jobmanager.rpc.port") .defaultValue(6123); - /** JVM heap size (in megabytes) for the JobManager */ + /** + * JVM heap size (in megabytes) for the JobManager + */ public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY = key("jobmanager.heap.mb") .defaultValue(1024); http://git-wip-us.apache.org/repos/asf/flink/blob/fb1ef081/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 9cb80aa..ec7af5a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; -import org.apache.flink.runtime.jobmanager.JobManagerOptions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -168,15 +168,12 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor flinkConfigurationPath = new Path(confFile.getAbsolutePath()); slots = flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); - - if (flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) { - jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); - } - if (flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) { - taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); - } + + jobManagerMemoryMb = flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY); + taskManagerMemoryMb = flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY); + } catch (Exception e) { - LOG.debug("Config couldn't be loaded from environment variable."); + LOG.debug("Config couldn't be loaded from environment variable.", e); } }