[FLINK-5904] Use proper ConfigOption syntax to retrieve config values

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb1ef081
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb1ef081
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb1ef081

Branch: refs/heads/master
Commit: fb1ef0812d002ae9cd6a7eb63e849b5e55f44a84
Parents: 0da62e8
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Apr 18 17:24:29 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Apr 19 14:37:05 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/JobManagerOptions.java       |  4 +++-
 .../flink/yarn/AbstractYarnClusterDescriptor.java    | 15 ++++++---------
 2 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb1ef081/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 2534ddf..d129405 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -57,7 +57,9 @@ public class JobManagerOptions {
                key("jobmanager.rpc.port")
                .defaultValue(6123);
 
-       /** JVM heap size (in megabytes) for the JobManager */
+       /**
+        * JVM heap size (in megabytes) for the JobManager
+        */
        public static final ConfigOption<Integer> JOB_MANAGER_HEAP_MEMORY =
                key("jobmanager.heap.mb")
                .defaultValue(1024);

http://git-wip-us.apache.org/repos/asf/flink/blob/fb1ef081/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 9cb80aa..ec7af5a 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
@@ -31,7 +32,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.jobmanager.JobManagerOptions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -168,15 +168,12 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                        flinkConfigurationPath = new 
Path(confFile.getAbsolutePath());
 
                        slots = 
flinkConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-                       
-                       if 
(flinkConfiguration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key()))
 {
-                               jobManagerMemoryMb = 
flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
-                       }
-                       if 
(flinkConfiguration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key()))
 {
-                               taskManagerMemoryMb = 
flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
-                       }
+
+                       jobManagerMemoryMb = 
flinkConfiguration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY);
+                       taskManagerMemoryMb = 
flinkConfiguration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY);
+
                } catch (Exception e) {
-                       LOG.debug("Config couldn't be loaded from environment 
variable.");
+                       LOG.debug("Config couldn't be loaded from environment 
variable.", e);
                }
        }
 

Reply via email to