[FLINK-4769] [metrics] Port metric config parameters to ConfigOptions

This closes #3687


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b77cc3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b77cc3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b77cc3f

Branch: refs/heads/master
Commit: 1b77cc3f2682afd839ce6b16cd1658bbd01a6b04
Parents: c0ea74a
Author: zentol <[email protected]>
Authored: Thu Apr 6 13:47:33 2017 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Apr 21 12:31:30 2017 +0200

----------------------------------------------------------------------
 .../connectors/kafka/KafkaTestBase.java         |  3 +-
 .../flink/configuration/ConfigConstants.java    | 44 +++++-----
 .../flink/configuration/MetricOptions.java      | 88 ++++++++++++++++++++
 .../ScheduledDropwizardReporterTest.java        |  7 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |  3 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  9 +-
 .../jobmanager/JMXJobManagerMetricTest.java     |  5 +-
 .../metrics/statsd/StatsDReporterTest.java      | 11 +--
 .../metrics/MetricRegistryConfiguration.java    | 24 ++----
 .../runtime/metrics/scope/ScopeFormat.java      | 49 -----------
 .../runtime/metrics/scope/ScopeFormats.java     | 32 +++----
 .../runtime/metrics/MetricRegistryTest.java     | 29 +++----
 .../metrics/groups/AbstractMetricGroupTest.java |  7 +-
 .../metrics/groups/JobManagerGroupTest.java     |  4 +-
 .../metrics/groups/JobManagerJobGroupTest.java  | 10 +--
 .../groups/MetricGroupRegistrationTest.java     |  3 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  4 +-
 .../metrics/groups/TaskManagerJobGroupTest.java | 10 +--
 .../metrics/groups/TaskMetricGroupTest.java     | 10 +--
 .../api/operators/AbstractStreamOperator.java   |  8 +-
 20 files changed, 196 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index 0c6bfa9..a21a239 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -116,7 +117,7 @@ public abstract class KafkaTestBase extends TestLogger {
                
flinkConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
                flinkConfig.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
16L);
                
flinkConfig.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "0 
s");
-               flinkConfig.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"my_reporter");
+               flinkConfig.setString(MetricOptions.REPORTERS_LIST, 
"my_reporter");
                flinkConfig.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                return flinkConfig;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a035beb..61c1b27 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -990,21 +990,8 @@ public final class ConfigConstants {
 
        // ---------------------------- Metrics 
-----------------------------------
 
-       /**
-        * The list of named reporters. Names are defined here and per-reporter 
configs
-        * are given with the reporter config prefix and the reporter name.
-        *
-        * Example:
-        * <pre>{@code
-        * metrics.reporters = foo, bar
-        *
-        * metrics.reporter.foo.class = 
org.apache.flink.metrics.reporter.JMXReporter
-        * metrics.reporter.foo.interval = 10
-        *
-        * metrics.reporter.bar.class = 
org.apache.flink.metrics.graphite.GraphiteReporter
-        * metrics.reporter.bar.port = 1337
-        * }</pre>
-        */
+       /** @deprecated Use {@link MetricOptions#REPORTERS_LIST} instead. */
+       @Deprecated
        public static final String METRICS_REPORTERS_LIST = "metrics.reporters";
 
        /**
@@ -1022,28 +1009,36 @@ public final class ConfigConstants {
        /**     The delimiter used to assemble the metric identifier. This is 
used as a suffix in an actual reporter config. */
        public static final String METRICS_REPORTER_SCOPE_DELIMITER = 
"scope.delimiter";
 
-       /** The delimiter used to assemble the metric identifier. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_DELIMITER} instead. */
+       @Deprecated
        public static final String METRICS_SCOPE_DELIMITER = 
"metrics.scope.delimiter";
 
-       /** The scope format string that is applied to all metrics scoped to a 
JobManager. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM} instead. */
+       @Deprecated
        public static final String METRICS_SCOPE_NAMING_JM = "metrics.scope.jm";
 
-       /** The scope format string that is applied to all metrics scoped to a 
TaskManager. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM} instead. */
+       @Deprecated
        public static final String METRICS_SCOPE_NAMING_TM = "metrics.scope.tm";
 
-       /** The scope format string that is applied to all metrics scoped to a 
job on a JobManager. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_JM_JOB} instead. 
*/
+       @Deprecated
        public static final String METRICS_SCOPE_NAMING_JM_JOB = 
"metrics.scope.jm.job";
 
-       /** The scope format string that is applied to all metrics scoped to a 
job on a TaskManager. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TM_JOB} instead. 
*/
+       @Deprecated
        public static final String METRICS_SCOPE_NAMING_TM_JOB = 
"metrics.scope.tm.job";
 
-       /** The scope format string that is applied to all metrics scoped to a 
task. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_TASK} instead. */
+       @Deprecated
        public static final String METRICS_SCOPE_NAMING_TASK = 
"metrics.scope.task";
 
-       /** The scope format string that is applied to all metrics scoped to an 
operator. */
+       /** @deprecated Use {@link MetricOptions#SCOPE_NAMING_OPERATOR} 
instead. */
+       @Deprecated
        public static final String METRICS_SCOPE_NAMING_OPERATOR = 
"metrics.scope.operator";
 
-       /** The number of measured latencies to maintain at each operator */
+       /** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. 
*/
+       @Deprecated
        public static final String METRICS_LATENCY_HISTORY_SIZE = 
"metrics.latency.history-size";
 
 
@@ -1498,7 +1493,8 @@ public final class ConfigConstants {
 
        // ----------------------------- Metrics ----------------------------
 
-       /** The default number of measured latencies to maintain at each 
operator */
+       /** @deprecated Use {@link MetricOptions#LATENCY_HISTORY_SIZE} instead. 
*/
+       @Deprecated
        public static final int DEFAULT_METRICS_LATENCY_HISTORY_SIZE = 128;
 
        // ----------------------------- Environment Variables 
----------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
new file mode 100644
index 0000000..8a328ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+@PublicEvolving
+public class MetricOptions {
+
+       /**
+        * The list of named reporters. Names are defined here and per-reporter 
configs
+        * are given with the reporter config prefix and the reporter name.
+        *
+        * Example:
+        * <pre>{@code
+        * metrics.reporters = foo, bar
+        *
+        * metrics.reporter.foo.class = 
org.apache.flink.metrics.reporter.JMXReporter
+        * metrics.reporter.foo.interval = 10
+        *
+        * metrics.reporter.bar.class = 
org.apache.flink.metrics.graphite.GraphiteReporter
+        * metrics.reporter.bar.port = 1337
+        * }</pre>
+        */
+       public static final ConfigOption<String> REPORTERS_LIST =
+               key("metrics.reporters")
+                       .noDefaultValue();
+
+       /** The delimiter used to assemble the metric identifier. */
+       public static final ConfigOption<String> SCOPE_DELIMITER =
+               key("metrics.scope.delimiter")
+                       .defaultValue(".");
+
+       /** The scope format string that is applied to all metrics scoped to a 
JobManager. */
+       public static final ConfigOption<String> SCOPE_NAMING_JM =
+               key("metrics.scope.jm")
+                       .defaultValue("<host>.jobmanager");
+
+       /** The scope format string that is applied to all metrics scoped to a 
TaskManager. */
+       public static final ConfigOption<String> SCOPE_NAMING_TM =
+               key("metrics.scope.tm")
+                       .defaultValue("<host>.taskmanager.<tm_id>");
+
+       /** The scope format string that is applied to all metrics scoped to a 
job on a JobManager. */
+       public static final ConfigOption<String> SCOPE_NAMING_JM_JOB =
+               key("metrics.scope.jm.job")
+                       .defaultValue("<host>.jobmanager.<job_name>");
+
+       /** The scope format string that is applied to all metrics scoped to a 
job on a TaskManager. */
+       public static final ConfigOption<String> SCOPE_NAMING_TM_JOB =
+               key("metrics.scope.tm.job")
+                       .defaultValue("<host>.taskmanager.<tm_id>.<job_name>");
+
+       /** The scope format string that is applied to all metrics scoped to a 
task. */
+       public static final ConfigOption<String> SCOPE_NAMING_TASK =
+               key("metrics.scope.task")
+                       
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>");
+
+       /** The scope format string that is applied to all metrics scoped to an 
operator. */
+       public static final ConfigOption<String> SCOPE_NAMING_OPERATOR =
+               key("metrics.scope.operator")
+                       
.defaultValue("<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>");
+
+       /** The number of measured latencies to maintain at each operator */
+       public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
+               key("metrics.latency.history-size")
+                       .defaultValue(128);
+
+       private MetricOptions() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 87c4ccb..73e7f0b 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -22,6 +22,7 @@ import com.codahale.metrics.ScheduledReporter;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
@@ -76,13 +77,13 @@ public class ScheduledDropwizardReporterTest {
                String taskManagerId = "tas:kMana::ger";
                String counterName = "testCounter";
 
-               configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               configuration.setString(MetricOptions.REPORTERS_LIST, "test");
                configuration.setString(
                                ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
                                
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");
 
-               
configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
-               
configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+               configuration.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
+               configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
                MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(configuration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 2f2a536..8f7796c 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -28,6 +28,7 @@ import com.codahale.metrics.Snapshot;
 import com.codahale.metrics.Timer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -96,7 +97,7 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
                int size = 10;
                String histogramMetricName = "histogram";
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"my_reporter");
+               config.setString(MetricOptions.REPORTERS_LIST, "my_reporter");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestingReporter.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
reportingInterval + " MILLISECONDS");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 1a96287..85ab897 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.metrics.jmx;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -94,7 +95,7 @@ public class JMXReporterTest extends TestLogger {
        @Test
        public void testPortConflictHandling() throws Exception {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+               cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1.port", "9020-9035");
@@ -154,7 +155,7 @@ public class JMXReporterTest extends TestLogger {
                Configuration cfg = new Configuration();
                cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
 
-               cfg.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+               cfg.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
 
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1.port", "9040-9055");
@@ -230,7 +231,7 @@ public class JMXReporterTest extends TestLogger {
 
                try {
                        Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+                       config.setString(MetricOptions.REPORTERS_LIST, 
"jmx_test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
                        registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -280,7 +281,7 @@ public class JMXReporterTest extends TestLogger {
 
                try {
                        Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+                       config.setString(MetricOptions.REPORTERS_LIST, 
"jmx_test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
                        registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 934a621..6d54db3 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -55,11 +56,11 @@ public class JMXJobManagerMetricTest {
                Deadline deadline = new FiniteDuration(2, 
TimeUnit.MINUTES).fromNow();
                Configuration flinkConfiguration = new Configuration();
 
-               
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+               flinkConfiguration.setString(MetricOptions.REPORTERS_LIST, 
"test");
                
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." 
+ ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
                
flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.port", "9060-9075");
 
-               
flinkConfiguration.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"jobmanager.<job_name>");
+               flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"jobmanager.<job_name>");
 
                TestingCluster flink = new TestingCluster(flinkConfiguration);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 17fd65a..9215c3f 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.statsd;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
@@ -75,13 +76,13 @@ public class StatsDReporterTest extends TestLogger {
                String taskManagerId = "tas:kMana::ger";
                String counterName = "testCounter";
 
-               configuration.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               configuration.setString(MetricOptions.REPORTERS_LIST, "test");
                configuration.setString(
                                ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
                                
"org.apache.flink.metrics.statsd.StatsDReporterTest$TestingStatsDReporter");
 
-               
configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
-               
configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
+               configuration.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
+               configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
                MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
 
@@ -145,7 +146,7 @@ public class StatsDReporterTest extends TestLogger {
                        int port = receiver.getPort();
 
                        Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+                       config.setString(MetricOptions.REPORTERS_LIST, "test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");
@@ -219,7 +220,7 @@ public class StatsDReporterTest extends TestLogger {
                        int port = receiver.getPort();
 
                        Configuration config = new Configuration();
-                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+                       config.setString(MetricOptions.REPORTERS_LIST, "test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index 596fc82..3475f04 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
-import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
@@ -101,13 +101,13 @@ public class MetricRegistryConfiguration {
 
                char delim;
                try {
-                       delim = 
configuration.getString(ConfigConstants.METRICS_SCOPE_DELIMITER, ".").charAt(0);
+                       delim = 
configuration.getString(MetricOptions.SCOPE_DELIMITER).charAt(0);
                } catch (Exception e) {
                        LOG.warn("Failed to parse delimiter, using default 
delimiter.", e);
                        delim = '.';
                }
 
-               final String definedReporters = 
configuration.getString(ConfigConstants.METRICS_REPORTERS_LIST, null);
+               final String definedReporters = 
configuration.getString(MetricOptions.REPORTERS_LIST);
                List<Tuple2<String, Configuration>> reporterConfigurations;
 
                if (definedReporters == null) {
@@ -136,18 +136,12 @@ public class MetricRegistryConfiguration {
         * @return Scope formats extracted from the given configuration
         */
        static ScopeFormats createScopeConfig(Configuration configuration) {
-               String jmFormat = configuration.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_JM, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-               String jmJobFormat = configuration.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-               String tmFormat = configuration.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-               String tmJobFormat = configuration.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-               String taskFormat = configuration.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-               String operatorFormat = configuration.getString(
-                       ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+               String jmFormat = 
configuration.getString(MetricOptions.SCOPE_NAMING_JM);
+               String jmJobFormat = 
configuration.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+               String tmFormat = 
configuration.getString(MetricOptions.SCOPE_NAMING_TM);
+               String tmJobFormat = 
configuration.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+               String taskFormat = 
configuration.getString(MetricOptions.SCOPE_NAMING_TASK);
+               String operatorFormat = 
configuration.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
 
                return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, 
tmJobFormat, taskFormat, operatorFormat);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
index adf0a85..f9efb88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormat.java
@@ -70,46 +70,15 @@ public abstract class ScopeFormat {
 
        public static final String SCOPE_HOST = asVariable("host");
 
-       // ----- Job Manager ----
-
-       /** The default scope format of the JobManager component: {@code 
"<host>.jobmanager"} */
-       public static final String DEFAULT_SCOPE_JOBMANAGER_COMPONENT =
-               concat(SCOPE_HOST, "jobmanager");
-
-       /** The default scope format of JobManager metrics: {@code 
"<host>.jobmanager"} */
-       public static final String DEFAULT_SCOPE_JOBMANAGER_GROUP = 
DEFAULT_SCOPE_JOBMANAGER_COMPONENT;
-
        // ----- Task Manager ----
 
        public static final String SCOPE_TASKMANAGER_ID = asVariable("tm_id");
 
-       /** The default scope format of the TaskManager component: {@code 
"<host>.taskmanager.<tm_id>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_COMPONENT =
-                       concat(SCOPE_HOST, "taskmanager", SCOPE_TASKMANAGER_ID);
-
-       /** The default scope format of TaskManager metrics: {@code 
"<host>.taskmanager.<tm_id>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_GROUP = 
DEFAULT_SCOPE_TASKMANAGER_COMPONENT;
-
        // ----- Job -----
 
        public static final String SCOPE_JOB_ID = asVariable("job_id");
        public static final String SCOPE_JOB_NAME = asVariable("job_name");
 
-       /** The default scope format for the job component: {@code 
"<job_name>"} */
-       public static final String DEFAULT_SCOPE_JOB_COMPONENT = SCOPE_JOB_NAME;
-
-       // ----- Job on Job Manager ----
-
-       /** The default scope format for all job metrics on a jobmanager: 
{@code "<host>.jobmanager.<job_name>"} */
-       public static final String DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP =
-               concat(DEFAULT_SCOPE_JOBMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
-
-       // ----- Job on Task Manager ----
-
-       /** The default scope format for all job metrics on a taskmanager: 
{@code "<host>.taskmanager.<tm_id>.<job_name>"} */
-       public static final String DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_COMPONENT, 
DEFAULT_SCOPE_JOB_COMPONENT);
-
        // ----- Task ----
 
        public static final String SCOPE_TASK_VERTEX_ID = asVariable("task_id");
@@ -118,27 +87,9 @@ public abstract class ScopeFormat {
        public static final String SCOPE_TASK_ATTEMPT_NUM = 
asVariable("task_attempt_num");
        public static final String SCOPE_TASK_SUBTASK_INDEX = 
asVariable("subtask_index");
 
-       /** Default scope of the task component: {@code 
"<task_name>.<subtask_index>"} */
-       public static final String DEFAULT_SCOPE_TASK_COMPONENT =
-                       concat(SCOPE_TASK_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-       /** The default scope format for all task metrics:
-        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>"} */
-       public static final String DEFAULT_SCOPE_TASK_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_TASK_COMPONENT);
-
        // ----- Operator ----
 
        public static final String SCOPE_OPERATOR_NAME = 
asVariable("operator_name");
-
-       /** The default scope added by the operator component: 
"<operator_name>.<subtask_index>" */
-       public static final String DEFAULT_SCOPE_OPERATOR_COMPONENT =
-                       concat(SCOPE_OPERATOR_NAME, SCOPE_TASK_SUBTASK_INDEX);
-
-       /** The default scope format for all operator metrics:
-        * {@code 
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>"} */
-       public static final String DEFAULT_SCOPE_OPERATOR_GROUP =
-                       concat(DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, 
DEFAULT_SCOPE_OPERATOR_COMPONENT);
        
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
index bbbe6ba..bde93be 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/scope/ScopeFormats.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.metrics.scope;
 
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -41,21 +41,21 @@ public class ScopeFormats {
         * Creates all default scope formats.
         */
        public ScopeFormats() {
-               this.jobManagerFormat = new 
JobManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_COMPONENT);
+               this.jobManagerFormat = new 
JobManagerScopeFormat(MetricOptions.SCOPE_NAMING_JM.defaultValue());
 
                this.jobManagerJobFormat = new JobManagerJobScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP, 
this.jobManagerFormat);
+                       MetricOptions.SCOPE_NAMING_JM_JOB.defaultValue(), 
this.jobManagerFormat);
 
-               this.taskManagerFormat = new 
TaskManagerScopeFormat(ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_COMPONENT);
+               this.taskManagerFormat = new 
TaskManagerScopeFormat(MetricOptions.SCOPE_NAMING_TM.defaultValue());
 
                this.taskManagerJobFormat = new TaskManagerJobScopeFormat(
-                               
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP, this.taskManagerFormat);
+                       MetricOptions.SCOPE_NAMING_TM_JOB.defaultValue(), 
this.taskManagerFormat);
 
                this.taskFormat = new TaskScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_TASK_GROUP, 
this.taskManagerJobFormat);
+                               MetricOptions.SCOPE_NAMING_TASK.defaultValue(), 
this.taskManagerJobFormat);
 
                this.operatorFormat = new OperatorScopeFormat(
-                               ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP, 
this.taskFormat);
+                       MetricOptions.SCOPE_NAMING_OPERATOR.defaultValue(), 
this.taskFormat);
        }
 
        /**
@@ -135,18 +135,12 @@ public class ScopeFormats {
         * @return The ScopeFormats parsed from the configuration
         */
        public static ScopeFormats fromConfig(Configuration config) {
-               String jmFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_JM, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_GROUP);
-               String jmJobFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
ScopeFormat.DEFAULT_SCOPE_JOBMANAGER_JOB_GROUP);
-               String tmFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_TM, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_GROUP);
-               String tmJobFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
ScopeFormat.DEFAULT_SCOPE_TASKMANAGER_JOB_GROUP);
-               String taskFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
ScopeFormat.DEFAULT_SCOPE_TASK_GROUP);
-               String operatorFormat = config.getString(
-                               ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
ScopeFormat.DEFAULT_SCOPE_OPERATOR_GROUP);
+               String jmFormat = 
config.getString(MetricOptions.SCOPE_NAMING_JM);
+               String jmJobFormat = 
config.getString(MetricOptions.SCOPE_NAMING_JM_JOB);
+               String tmFormat = 
config.getString(MetricOptions.SCOPE_NAMING_TM);
+               String tmJobFormat = 
config.getString(MetricOptions.SCOPE_NAMING_TM_JOB);
+               String taskFormat = 
config.getString(MetricOptions.SCOPE_NAMING_TASK);
+               String operatorFormat = 
config.getString(MetricOptions.SCOPE_NAMING_OPERATOR);
 
                return new ScopeFormats(jmFormat, jmJobFormat, tmFormat, 
tmJobFormat, taskFormat, operatorFormat);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index fe29ccb..b9502b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -23,6 +23,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
@@ -69,7 +70,7 @@ public class MetricRegistryTest extends TestLogger {
        public void testReporterInstantiation() {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
                MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
@@ -97,7 +98,7 @@ public class MetricRegistryTest extends TestLogger {
        public void testMultipleReporterInstantiation() {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1, test2,test3");
+               config.setString(MetricOptions.REPORTERS_LIST, "test1, 
test2,test3");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter11.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter12.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter13.class.getName());
@@ -147,7 +148,7 @@ public class MetricRegistryTest extends TestLogger {
        public void testReporterArgumentForwarding() {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg2", "world");
@@ -172,7 +173,7 @@ public class MetricRegistryTest extends TestLogger {
        public void testReporterScheduling() throws InterruptedException {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter3.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test.arg1", "hello");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
@@ -215,7 +216,7 @@ public class MetricRegistryTest extends TestLogger {
        @Test
        public void testReporterNotifications() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter6.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter7.class.getName());
 
@@ -278,10 +279,10 @@ public class MetricRegistryTest extends TestLogger {
        public void testScopeConfig() {
                Configuration config = new Configuration();
 
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"B");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"C");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_OPERATOR, 
"D");
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+               config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+               config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+               config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
 
                ScopeFormats scopeConfig = 
MetricRegistryConfiguration.createScopeConfig(config);
 
@@ -294,8 +295,8 @@ public class MetricRegistryTest extends TestLogger {
        @Test
        public void testConfigurableDelimiter() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D.E");
+               config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
 
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
@@ -308,7 +309,7 @@ public class MetricRegistryTest extends TestLogger {
        @Test
        public void testConfigurableDelimiterForReporters() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3");
+               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -331,7 +332,7 @@ public class MetricRegistryTest extends TestLogger {
        @Test
        public void testConfigurableDelimiterForReportersInGroup() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3,test4");
+               config.setString(MetricOptions.REPORTERS_LIST, 
"test1,test2,test3,test4");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
@@ -339,7 +340,7 @@ public class MetricRegistryTest extends TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B");
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
 
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
                List<MetricReporter> reporters = registry.getReporters();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index da14bfd..04e40ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
@@ -77,8 +78,8 @@ public class AbstractMetricGroupTest {
        @Test
        public void testScopeCachingForMultipleReporters() throws Exception {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2");
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
+               config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter2.class.getName());
@@ -166,7 +167,7 @@ public class AbstractMetricGroupTest {
        @Test
        public void testScopeGenerationWithoutReporters() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B.C.D");
+               config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
                MetricRegistry testRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 317d19e..7834755 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -18,8 +18,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -110,7 +110,7 @@ public class JobManagerGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, 
"constant.<host>.foo.<host>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_JM, 
"constant.<host>.foo.<host>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobManagerMetricGroup group = new 
JobManagerMetricGroup(registry, "host");

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index ed4056f..d8bd57a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -54,8 +54,8 @@ public class JobManagerJobGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "abc");
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"some-constant.<job_name>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
+               cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"some-constant.<job_name>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
@@ -77,8 +77,8 @@ public class JobManagerJobGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustomWildcard() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM, "peter");
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_JM_JOB, 
"*.some-constant.<job_id>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
+               cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, 
"*.some-constant.<job_id>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index f121b59..ace0236 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
@@ -41,7 +42,7 @@ public class MetricGroupRegistrationTest {
        @Test
        public void testMetricInstantiation() {
                Configuration config = new Configuration();
-               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test");
+               config.setString(MetricOptions.REPORTERS_LIST, "test");
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
 
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 965e8fb..11f3c0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
@@ -161,7 +161,7 @@ public class TaskManagerGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"constant.<host>.foo.<host>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM, 
"constant.<host>.foo.<host>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
                TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index f9bef6c..f9891cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -54,8 +54,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"some-constant.<job_name>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"some-constant.<job_name>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
@@ -76,8 +76,8 @@ public class TaskManagerJobGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustomWildcard() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"peter.<tm_id>");
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"*.some-constant.<job_id>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.<tm_id>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"*.some-constant.<job_id>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index eb906d0..183237a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -63,9 +63,9 @@ public class TaskMetricGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeCustom() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "abc");
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM_JOB, 
"def");
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<tm_id>.<job_id>.<task_id>.<task_attempt_id>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                JobID jid = new JobID();
@@ -90,7 +90,7 @@ public class TaskMetricGroupTest extends TestLogger {
        @Test
        public void testGenerateScopeWilcard() {
                Configuration cfg = new Configuration();
-               cfg.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"*.<task_attempt_id>.<subtask_index>");
+               cfg.setString(MetricOptions.SCOPE_NAMING_TASK, 
"*.<task_attempt_id>.<subtask_index>");
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                AbstractID executionId = new AbstractID();

http://git-wip-us.apache.org/repos/asf/flink/blob/1b77cc3f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7569170..99c3f4e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -28,8 +28,8 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -177,10 +177,10 @@ public abstract class AbstractStreamOperator<OUT>
                        ((OperatorMetricGroup) 
this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
                }
                Configuration taskManagerConfig = 
container.getEnvironment().getTaskManagerInfo().getConfiguration();
-               int historySize = 
taskManagerConfig.getInteger(ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE);
+               int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
                if (historySize <= 0) {
-                       LOG.warn("{} has been set to a value equal or below 0: 
{}. Using default.", ConfigConstants.METRICS_LATENCY_HISTORY_SIZE, historySize);
-                       historySize = 
ConfigConstants.DEFAULT_METRICS_LATENCY_HISTORY_SIZE;
+                       LOG.warn("{} has been set to a value equal or below 0: 
{}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
+                       historySize = 
MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
                }
 
                latencyGauge = this.metrics.gauge("latency", new 
LatencyGauge(historySize));

Reply via email to