[FLINK-7876] Register TaskManagerMetricGroup under ResourceID

This commit changes that TaskManagerMetricGroups are now registered under the
TaskManager's ResourceID instead of the InstanceID. This allows to create the
TaskManagerMetricGroup at startup of the TaskManager.

Moreover, it pulls the MetricRegistry out of JobManager and TaskManager. This
allows to reuse the same MetricRegistry across multiple instances (e.g. in the
FlinkMiniCluster case). Moreover, it ensures proper cleanup of a potentially
started MetricyQueryServiceActor.

Change TaskManagersHandler to work with ResourceID instead of InstanceID

Adapt MetricFetcher to use ResourceID instead of InstanceID

This closes #4872.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d45b9412
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d45b9412
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d45b9412

Branch: refs/heads/master
Commit: d45b941280155a67aed3f3518b2a05eedc1dab2e
Parents: a7e0a27
Author: Till <[email protected]>
Authored: Thu Oct 19 17:22:53 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 1 15:52:00 2017 +0100

----------------------------------------------------------------------
 .../entrypoint/MesosJobClusterEntrypoint.java   |   6 +-
 .../MesosSessionClusterEntrypoint.java          |   6 +-
 .../MesosApplicationMasterRunner.java           |  13 +
 .../clusterframework/MesosResourceManager.java  |   4 +-
 .../clusterframework/MesosJobManager.scala      |   7 +-
 .../clusterframework/MesosTaskManager.scala     |   6 +-
 .../MesosResourceManagerTest.java               |   8 +-
 .../ScheduledDropwizardReporterTest.java        |   4 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |   6 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  14 +-
 .../PrometheusReporterTaskScopeTest.java        |   6 +-
 .../prometheus/PrometheusReporterTest.java      |  16 +-
 .../flink/metrics/slf4j/Slf4jReporterTest.java  |   6 +-
 .../metrics/statsd/StatsDReporterTest.java      |  12 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   2 +
 .../runtime/akka/AkkaJobManagerGateway.java     |  10 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  11 +-
 .../dispatcher/StandaloneDispatcher.java        |   6 +-
 .../runtime/entrypoint/ClusterEntrypoint.java   |  10 +-
 .../entrypoint/JobClusterEntrypoint.java        |   8 +-
 .../entrypoint/SessionClusterEntrypoint.java    |   8 +-
 .../StandaloneSessionClusterEntrypoint.java     |   4 +-
 .../runtime/jobmaster/JobManagerGateway.java    |   6 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   4 +-
 .../flink/runtime/metrics/MetricRegistry.java   | 378 +-------------
 .../metrics/MetricRegistryConfiguration.java    |   2 +-
 .../runtime/metrics/MetricRegistryImpl.java     | 412 +++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     |   2 +-
 .../flink/runtime/minicluster/MiniCluster.java  |  12 +-
 .../minicluster/MiniClusterJobDispatcher.java   |   8 +-
 .../minicluster/StandaloneMiniCluster.java      |  29 +-
 .../resourcemanager/ResourceManager.java        |  12 +-
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../resourcemanager/ResourceManagerRunner.java  |   4 +-
 .../StandaloneResourceManager.java              |   4 +-
 .../handler/legacy/TaskManagerLogHandler.java   |   8 +-
 .../handler/legacy/TaskManagersHandler.java     |  21 +-
 .../handler/legacy/metrics/MetricFetcher.java   |  10 +-
 .../runtime/taskexecutor/TaskExecutor.java      |   6 +-
 .../runtime/taskexecutor/TaskManagerRunner.java |  24 +-
 .../taskexecutor/TaskManagerServices.java       |  18 +-
 .../TaskManagerServicesConfiguration.java       |  13 -
 .../runtime/webmonitor/RestfulGateway.java      |   4 +-
 .../ContaineredJobManager.scala                 |   7 +-
 .../flink/runtime/jobmanager/JobManager.scala   | 113 ++---
 .../runtime/messages/JobManagerMessages.scala   |   7 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   8 +
 .../minicluster/LocalFlinkMiniCluster.scala     |  52 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  48 +-
 .../clusterframework/ResourceManagerTest.java   |   6 +-
 .../runtime/dispatcher/DispatcherTest.java      |  10 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  13 +-
 .../runtime/jobmanager/JobManagerTest.java      |  11 +
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +
 .../jobmaster/JobManagerRunnerMockTest.java     |   4 +-
 .../JobManagerLeaderElectionTest.java           |   5 +-
 .../runtime/metrics/MetricRegistryImplTest.java | 496 +++++++++++++++++++
 .../runtime/metrics/MetricRegistryTest.java     | 496 -------------------
 .../runtime/metrics/NoOpMetricRegistry.java     |  60 +++
 .../runtime/metrics/TaskManagerMetricsTest.java |  16 +-
 .../metrics/dump/MetricQueryServiceTest.java    |   4 +-
 .../metrics/groups/AbstractMetricGroupTest.java |   8 +-
 .../metrics/groups/JobManagerGroupTest.java     |  12 +-
 .../metrics/groups/JobManagerJobGroupTest.java  |  10 +-
 .../groups/MetricGroupRegistrationTest.java     |   9 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  11 +-
 .../metrics/groups/OperatorGroupTest.java       |  12 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  12 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |  10 +-
 .../metrics/groups/TaskMetricGroupTest.java     |  14 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |   4 +-
 .../resourcemanager/ResourceManagerHATest.java  |   4 +-
 .../ResourceManagerJobMasterTest.java           |   4 +-
 .../ResourceManagerTaskExecutorTest.java        |   4 +-
 .../legacy/TaskManagerLogHandlerTest.java       |   2 +-
 .../legacy/metrics/MetricFetcherTest.java       |  10 +-
 .../taskexecutor/TaskExecutorITCase.java        |   4 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  24 +-
 .../taskexecutor/TaskManagerServicesTest.java   |   3 +-
 ...askManagerComponentsStartupShutdownTest.java |   7 +-
 .../TaskManagerProcessReapingTestBase.java      |  20 +-
 .../taskmanager/TaskManagerStartupTest.java     |   2 +
 .../jobmanager/JobManagerRegistrationTest.scala |  17 +-
 .../runtime/testingUtils/TestingCluster.scala   |   7 +-
 .../testingUtils/TestingJobManager.scala        |   6 +-
 .../testingUtils/TestingTaskManager.scala       |  10 +-
 .../runtime/testingUtils/TestingUtils.scala     |  21 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   2 +
 .../JobManagerHACheckpointRecoveryITCase.java   |   2 +
 .../JobManagerHAJobGraphRecoveryITCase.java     |   2 +
 ...agerHAProcessFailureBatchRecoveryITCase.java |   2 +
 .../recovery/ProcessFailureCancelingITCase.java |   2 +
 .../AbstractOperatorRestoreTestBase.java        |   3 +
 .../flink/yarn/TestingYarnJobManager.scala      |   6 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |   6 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |  13 +
 .../apache/flink/yarn/YarnResourceManager.java  |   4 +-
 .../entrypoint/YarnJobClusterEntrypoint.java    |   4 +-
 .../YarnSessionClusterEntrypoint.java           |   4 +-
 .../org/apache/flink/yarn/YarnJobManager.scala  |   7 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   6 +-
 101 files changed, 1503 insertions(+), 1317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index 9a3639d..b98adff 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -112,7 +112,7 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry 
metricRegistry) throws Exception {
+       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl 
metricRegistry) throws Exception {
                super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
        }
 
@@ -123,7 +123,7 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index aa511b0..0cf0fce 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -102,7 +102,7 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry 
metricRegistry) throws Exception {
+       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl 
metricRegistry) throws Exception {
                super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
        }
 
@@ -113,7 +113,7 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index a6ea133..9887d97 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -39,6 +39,8 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -200,6 +202,7 @@ public class MesosApplicationMasterRunner {
                ExecutorService ioExecutor = null;
                MesosServices mesosServices = null;
                HighAvailabilityServices highAvailabilityServices = null;
+               MetricRegistryImpl metricRegistry = null;
 
                try {
                        // ------- (1) load and parse / validate all 
configurations -------
@@ -304,6 +307,11 @@ public class MesosApplicationMasterRunner {
                        // 2: the JobManager
                        LOG.debug("Starting JobManager actor");
 
+                       metricRegistry = new MetricRegistryImpl(
+                               
MetricRegistryConfiguration.fromConfiguration(config));
+
+                       metricRegistry.startQueryService(actorSystem, null);
+
                        // we start the JobManager with its standard name
                        ActorRef jobManager = JobManager.startJobManagerActors(
                                config,
@@ -311,6 +319,7 @@ public class MesosApplicationMasterRunner {
                                futureExecutor,
                                ioExecutor,
                                highAvailabilityServices,
+                               metricRegistry,
                                webMonitor != null ? 
Option.apply(webMonitor.getRestAddress()) : Option.empty(),
                                Option.apply(JobMaster.JOB_MANAGER_NAME),
                                Option.apply(JobMaster.ARCHIVE_NAME),
@@ -422,6 +431,10 @@ public class MesosApplicationMasterRunner {
                        }
                }
 
+               if (metricRegistry != null) {
+                       metricRegistry.shutdown();
+               }
+
                org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
                        AkkaUtils.getTimeout(config).toMillis(),
                        TimeUnit.MILLISECONDS,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 1e32b2c..7ea4908 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -49,7 +49,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -145,7 +145,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler,
                        // Mesos specifics

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index 9ad8eb2..c6230e7 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => 
FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 
@@ -66,7 +67,7 @@ class MesosJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends ContaineredJobManager(
     flinkConfiguration,
@@ -83,7 +84,7 @@ class MesosJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry,
+    jobManagerMetricGroup,
     optRestAddress) {
 
   val jobPollingInterval: FiniteDuration = 5 seconds

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 7834639..e69472e 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices
 import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.memory.MemoryManager
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 
@@ -39,7 +39,7 @@ class MesosTaskManager(
     network: NetworkEnvironment,
     numberOfSlots: Int,
     highAvailabilityServices: HighAvailabilityServices,
-    metricRegistry : MetricRegistry)
+    taskManagerMetricGroup : TaskManagerMetricGroup)
   extends TaskManager(
     config,
     resourceID,
@@ -49,7 +49,7 @@ class MesosTaskManager(
     network,
     numberOfSlots,
     highAvailabilityServices,
-    metricRegistry) {
+    taskManagerMetricGroup) {
 
   override def handleMessage: Receive = {
     super.handleMessage

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 4bdd9a3..1cdd087 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -160,7 +160,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler,
 
@@ -306,7 +306,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        public final ScheduledExecutor scheduledExecutor;
                        public final TestingHighAvailabilityServices 
highAvailabilityServices;
                        public final HeartbeatServices heartbeatServices;
-                       public final MetricRegistry metricRegistry;
+                       public final MetricRegistryImpl metricRegistry;
                        public final TestingLeaderElectionService 
rmLeaderElectionService;
                        public final JobLeaderIdService jobLeaderIdService;
                        public final SlotManager slotManager;
@@ -321,7 +321,7 @@ public class MesosResourceManagerTest extends TestLogger {
                                rmLeaderElectionService = new 
TestingLeaderElectionService();
                                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                                heartbeatServices = new 
TestingHeartbeatServices(5L, 5L, scheduledExecutor);
-                               metricRegistry = mock(MetricRegistry.class);
+                               metricRegistry = mock(MetricRegistryImpl.class);
                                slotManager = mock(SlotManager.class);
                                slotManagerStarted = new CompletableFuture<>();
                                jobLeaderIdService = new JobLeaderIdService(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 5ed6de2..e6d5e27 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -92,7 +92,7 @@ public class ScheduledDropwizardReporterTest {
 
                MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(configuration);
 
-               MetricRegistry metricRegistry = new 
MetricRegistry(metricRegistryConfiguration);
+               MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(metricRegistryConfiguration);
 
                char delimiter = metricRegistry.getDelimiter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index 63765ae..a927a30 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
@@ -105,12 +105,12 @@ public class DropwizardFlinkHistogramWrapperTest extends 
TestLogger {
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestingReporter.class.getName());
                config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
reportingInterval + " MILLISECONDS");
 
-               MetricRegistry registry = null;
+               MetricRegistryImpl registry = null;
 
                MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(config);
 
                try {
-                       registry = new 
MetricRegistry(metricRegistryConfiguration);
+                       registry = new 
MetricRegistryImpl(metricRegistryConfiguration);
                        DropwizardHistogramWrapper histogramWrapper = new 
DropwizardHistogramWrapper(
                                new com.codahale.metrics.Histogram(new 
SlidingWindowReservoir(size)));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index f10769a..4c97055 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -108,7 +108,7 @@ public class JMXReporterTest extends TestLogger {
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2.port", "9020-9035");
 
-               MetricRegistry reg = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl reg = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
 
@@ -168,7 +168,7 @@ public class JMXReporterTest extends TestLogger {
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
JMXReporter.class.getName());
                cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2.port", "9040-9055");
 
-               MetricRegistry reg = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+               MetricRegistryImpl reg = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
 
                TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, 
"host", "tm");
 
@@ -231,7 +231,7 @@ public class JMXReporterTest extends TestLogger {
         */
        @Test
        public void testHistogramReporting() throws Exception {
-               MetricRegistry registry = null;
+               MetricRegistryImpl registry = null;
                String histogramName = "histogram";
 
                try {
@@ -239,7 +239,7 @@ public class JMXReporterTest extends TestLogger {
                        config.setString(MetricOptions.REPORTERS_LIST, 
"jmx_test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-                       registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+                       registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                        TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -281,7 +281,7 @@ public class JMXReporterTest extends TestLogger {
         */
        @Test
        public void testMeterReporting() throws Exception {
-               MetricRegistry registry = null;
+               MetricRegistryImpl registry = null;
                String meterName = "meter";
 
                try {
@@ -289,7 +289,7 @@ public class JMXReporterTest extends TestLogger {
                        config.setString(MetricOptions.REPORTERS_LIST, 
"jmx_test");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
 
-                       registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+                       registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                        TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
index 0ae8fc7..55ddc00 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -26,8 +26,8 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -75,12 +75,12 @@ public class PrometheusReporterTaskScopeTest {
        private TaskMetricGroup taskMetricGroup1;
        private TaskMetricGroup taskMetricGroup2;
 
-       private MetricRegistry registry;
+       private MetricRegistryImpl registry;
        private PrometheusReporter reporter;
 
        @Before
        public void setupReporter() {
-               registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
                reporter = (PrometheusReporter) registry.getReporters().get(0);
 
                TaskManagerMetricGroup tmMetricGroup = new 
TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 0d7be6d..6704189 100644
--- 
a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ 
b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -28,8 +28,8 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestMeter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
@@ -69,13 +69,13 @@ public class PrometheusReporterTest extends TestLogger {
        @Rule
        public ExpectedException thrown = ExpectedException.none();
 
-       private MetricRegistry registry;
+       private MetricRegistryImpl registry;
        private FrontMetricGroup<TaskManagerMetricGroup> metricGroup;
        private PrometheusReporter reporter;
 
        @Before
        public void setupReporter() {
-               registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
                metricGroup = new FrontMetricGroup<>(0, new 
TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER));
                reporter = (PrometheusReporter) registry.getReporters().get(0);
        }
@@ -158,7 +158,7 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void endpointIsUnavailableAfterReporterIsClosed() throws 
UnirestException {
-               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               MetricRegistryImpl registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
                PrometheusReporter reporter = (PrometheusReporter) 
registry.getReporters().get(0);
                reporter.close();
                thrown.expect(UnirestException.class);
@@ -244,12 +244,12 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void cannotStartTwoReportersOnSamePort() {
-               final MetricRegistry fixedPort1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
+               final MetricRegistryImpl fixedPort1 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9400-9500")));
                assertThat(fixedPort1.getReporters(), hasSize(1));
 
                PrometheusReporter firstReporter = (PrometheusReporter) 
fixedPort1.getReporters().get(0);
 
-               final MetricRegistry fixedPort2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 String.valueOf(firstReporter.getPort()))));
+               final MetricRegistryImpl fixedPort2 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 String.valueOf(firstReporter.getPort()))));
                assertThat(fixedPort2.getReporters(), hasSize(0));
 
                fixedPort1.shutdown();
@@ -258,8 +258,8 @@ public class PrometheusReporterTest extends TestLogger {
 
        @Test
        public void canStartTwoReportersWhenUsingPortRange() {
-               final MetricRegistry portRange1 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9200-9300")));
-               final MetricRegistry portRange2 = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9200-9300")));
+               final MetricRegistryImpl portRange1 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1",
 "9200-9300")));
+               final MetricRegistryImpl portRange2 = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2",
 "9200-9300")));
 
                assertThat(portRange1.getReporters(), hasSize(1));
                assertThat(portRange2.getReporters(), hasSize(1));

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index 51724bd..ba7c5a1 100644
--- 
a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ 
b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -29,8 +29,8 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingHistogram;
@@ -53,7 +53,7 @@ public class Slf4jReporterTest extends TestLogger {
        private static final String TASK_MANAGER_ID = "tm01";
        private static final String JOB_NAME = "jn01";
        private static final String TASK_NAME = "tn01";
-       private static MetricRegistry registry;
+       private static MetricRegistryImpl registry;
        private static char delimiter;
        private static TaskMetricGroup taskMetricGroup;
        private static Slf4jReporter reporter;
@@ -68,7 +68,7 @@ public class Slf4jReporterTest extends TestLogger {
                        ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
Slf4jReporter.class.getName());
                configuration.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
 
-               registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+               registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
                delimiter = registry.getDelimiter();
 
                taskMetricGroup = new TaskManagerMetricGroup(registry, 
HOST_NAME, TASK_MANAGER_ID)

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 94de9a9..f460abd 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -30,7 +30,7 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -89,7 +89,7 @@ public class StatsDReporterTest extends TestLogger {
                configuration.setString(MetricOptions.SCOPE_NAMING_TASK, 
"<host>.<tm_id>.<job_name>");
                configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");
 
-               MetricRegistry metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+               MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
                char delimiter = metricRegistry.getDelimiter();
 
@@ -133,7 +133,7 @@ public class StatsDReporterTest extends TestLogger {
         */
        @Test
        public void testStatsDHistogramReporting() throws Exception {
-               MetricRegistry registry = null;
+               MetricRegistryImpl registry = null;
                DatagramSocketReceiver receiver = null;
                Thread receiverThread = null;
                long timeout = 5000;
@@ -157,7 +157,7 @@ public class StatsDReporterTest extends TestLogger {
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + 
port);
 
-                       registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+                       registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
 
                        TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
 
@@ -207,7 +207,7 @@ public class StatsDReporterTest extends TestLogger {
         */
        @Test
        public void testStatsDMetersReporting() throws Exception {
-               MetricRegistry registry = null;
+               MetricRegistryImpl registry = null;
                DatagramSocketReceiver receiver = null;
                Thread receiverThread = null;
                long timeout = 5000;
@@ -231,7 +231,7 @@ public class StatsDReporterTest extends TestLogger {
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");
                        
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + 
port);
 
-                       registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+                       registry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
                        TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
                        TestMeter meter = new TestMeter();
                        metricGroup.meter(meterName, meter);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index f9dd98e..75a844c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.rest.handler.util.MimeTypes;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -193,6 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
                                        TestingUtils.defaultExecutor(),
                                        TestingUtils.defaultExecutor(),
                                        highAvailabilityServices,
+                                       new NoOpMetricRegistry(),
                                        
Option.apply(webMonitor[i].getRestAddress()),
                                        JobManager.class,
                                        MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 6896852..08946ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -21,11 +21,11 @@ package org.apache.flink.runtime.akka;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -177,10 +177,10 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
        
//--------------------------------------------------------------------------------
 
        @Override
-       public CompletableFuture<Optional<Instance>> 
requestTaskManagerInstance(InstanceID instanceId, Time timeout) {
+       public CompletableFuture<Optional<Instance>> 
requestTaskManagerInstance(ResourceID resourceId, Time timeout) {
                return FutureUtils.toJava(
                        jobManagerGateway
-                               .ask(new 
JobManagerMessages.RequestTaskManagerInstance(instanceId), 
FutureUtils.toFiniteDuration(timeout))
+                               .ask(new 
JobManagerMessages.RequestTaskManagerInstance(resourceId), 
FutureUtils.toFiniteDuration(timeout))
                                
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
                        .thenApply(
                                (JobManagerMessages.TaskManagerInstance 
taskManagerResponse) -> {
@@ -265,7 +265,7 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
        }
 
        @Override
-       public CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+       public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
                return requestTaskManagerInstances(timeout)
                        .thenApply(
                                (Collection<Instance> instances) ->
@@ -277,7 +277,7 @@ public class AkkaJobManagerGateway implements 
JobManagerGateway {
                                                                final String 
taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) +
                                                                        
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + 
instance.getTaskManagerID().getResourceIdString();
 
-                                                               return 
Tuple2.of(instance.getId(), taskManagerMetricQuerServicePath);
+                                                               return 
Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
                                                        })
                                                .collect(Collectors.toList()));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index dda0275..c2f8539 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
@@ -46,7 +45,7 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -89,7 +88,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        private final ResourceManagerGateway resourceManagerGateway;
        private final JobManagerServices jobManagerServices;
        private final HeartbeatServices heartbeatServices;
-       private final MetricRegistry metricRegistry;
+       private final MetricRegistryImpl metricRegistry;
 
        private final FatalErrorHandler fatalErrorHandler;
 
@@ -107,7 +106,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        FatalErrorHandler fatalErrorHandler,
                        Optional<String> restAddress) throws Exception {
                super(rpcService, endpointId);
@@ -384,7 +383,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
-       public CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+       public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
                return 
resourceManagerGateway.requestTaskManagerMetricQueryServicePaths(timeout);
        }
 
@@ -480,7 +479,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
                JobManagerServices jobManagerServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                OnCompletionActions onCompleteActions,
                FatalErrorHandler fatalErrorHandler) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index 5a6889e..ee92663 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -49,7 +49,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        FatalErrorHandler fatalErrorHandler,
                        Optional<String> restAddress) throws Exception {
                super(
@@ -74,7 +74,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        JobManagerServices jobManagerServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        OnCompletionActions onCompleteActions,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                // create the standard job manager runner

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index b2ddf1d..1a0e2ae 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -30,8 +30,8 @@ import 
org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -76,7 +76,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
        private final CompletableFuture<Boolean> terminationFuture;
 
        @GuardedBy("lock")
-       private MetricRegistry metricRegistry = null;
+       private MetricRegistryImpl metricRegistry = null;
 
        @GuardedBy("lock")
        private HighAvailabilityServices haServices = null;
@@ -204,8 +204,8 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                return HeartbeatServices.fromConfiguration(configuration);
        }
 
-       protected MetricRegistry createMetricRegistry(Configuration 
configuration) {
-               return new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+       protected MetricRegistryImpl createMetricRegistry(Configuration 
configuration) {
+               return new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
        }
 
        protected void shutDown(boolean cleanupHaData) throws FlinkException {
@@ -278,7 +278,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                HighAvailabilityServices highAvailabilityServices,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry) throws Exception;
+               MetricRegistryImpl metricRegistry) throws Exception;
 
        protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 124c6c6..50d29da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -59,7 +59,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        HighAvailabilityServices highAvailabilityServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry) throws Exception {
+                       MetricRegistryImpl metricRegistry) throws Exception {
 
                resourceManager = createResourceManager(
                        configuration,
@@ -96,7 +96,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        HighAvailabilityServices highAvailabilityServices,
                        JobManagerServices jobManagerServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
 
                JobGraph jobGraph = retrieveJobGraph(configuration);
@@ -163,7 +163,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception;
 
        protected abstract JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index e24e01a..8a48864 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -69,7 +69,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        HighAvailabilityServices highAvailabilityServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry) throws Exception {
+                       MetricRegistryImpl metricRegistry) throws Exception {
 
                dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
 
@@ -173,7 +173,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                ResourceManagerGateway resourceManagerGateway,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                FatalErrorHandler fatalErrorHandler,
                Optional<String> restAddress) throws Exception {
 
@@ -197,6 +197,6 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index e7c9816..7d4373d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -51,7 +51,7 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 782d6d0..2527e46 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -107,11 +107,11 @@ public interface JobManagerGateway extends RestfulGateway 
{
         * Requests the TaskManager instance registered under the given 
instanceId from the JobManager.
         * If there is no Instance registered, then {@link Optional#empty()} is 
returned.
         *
-        * @param instanceId for which to retrieve the Instance
+        * @param resourceId identifying the TaskManager which shall be 
retrieved
         * @param timeout for the asynchronous operation
         * @return Future containing the TaskManager instance registered under 
instanceId, otherwise {@link Optional#empty()}
         */
-       CompletableFuture<Optional<Instance>> 
requestTaskManagerInstance(InstanceID instanceId, Time timeout);
+       CompletableFuture<Optional<Instance>> 
requestTaskManagerInstance(ResourceID resourceId, Time timeout);
 
        /**
         * Requests all currently registered TaskManager instances from the 
JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 14baa6f..0a85bbe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -111,7 +111,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final HighAvailabilityServices haServices,
                        final HeartbeatServices heartbeatServices,
                        final JobManagerServices jobManagerServices,
-                       final MetricRegistry metricRegistry,
+                       final MetricRegistryImpl metricRegistry,
                        final OnCompletionActions toNotifyOnComplete,
                        final FatalErrorHandler errorHandler) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 278292d..9aa97cb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,283 +18,34 @@
 
 package org.apache.flink.runtime.metrics;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.View;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
 
 /**
- * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
- * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
+ * Interface for a metric registry.
  */
-public class MetricRegistry {
-       static final Logger LOG = LoggerFactory.getLogger(MetricRegistry.class);
-
-       private final Object lock = new Object();
-
-       private List<MetricReporter> reporters;
-       private ScheduledExecutorService executor;
-
-       @Nullable
-       private ActorRef queryService;
-
-       @Nullable
-       private String metricQueryServicePath;
-
-       private ViewUpdater viewUpdater;
-
-       private final ScopeFormats scopeFormats;
-       private final char globalDelimiter;
-       private final List<Character> delimiters = new ArrayList<>();
-
-       /**
-        * Creates a new MetricRegistry and starts the configured reporter.
-        */
-       public MetricRegistry(MetricRegistryConfiguration config) {
-               this.scopeFormats = config.getScopeFormats();
-               this.globalDelimiter = config.getDelimiter();
-
-               // second, instantiate any custom configured reporters
-               this.reporters = new ArrayList<>();
-
-               List<Tuple2<String, Configuration>> reporterConfigurations = 
config.getReporterConfigurations();
-
-               this.executor = Executors.newSingleThreadScheduledExecutor(new 
ExecutorThreadFactory("Flink-MetricRegistry"));
-
-               this.queryService = null;
-               this.metricQueryServicePath = null;
-
-               if (reporterConfigurations.isEmpty()) {
-                       // no reporters defined
-                       // by default, don't report anything
-                       LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
-               } else {
-                       // we have some reporters so
-                       for (Tuple2<String, Configuration> 
reporterConfiguration: reporterConfigurations) {
-                               String namedReporter = reporterConfiguration.f0;
-                               Configuration reporterConfig = 
reporterConfiguration.f1;
-
-                               final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-                               if (className == null) {
-                                       LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-                                       continue;
-                               }
-
-                               try {
-                                       String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
-                                       TimeUnit timeunit = TimeUnit.SECONDS;
-                                       long period = 10;
-
-                                       if (configuredPeriod != null) {
-                                               try {
-                                                       String[] interval = 
configuredPeriod.split(" ");
-                                                       period = 
Long.parseLong(interval[0]);
-                                                       timeunit = 
TimeUnit.valueOf(interval[1]);
-                                               }
-                                               catch (Exception e) {
-                                                       LOG.error("Cannot parse 
report interval from config: " + configuredPeriod +
-                                                                       " - 
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
-                                                                       "Using 
default reporting interval.");
-                                               }
-                                       }
-
-                                       Class<?> reporterClass = 
Class.forName(className);
-                                       MetricReporter reporterInstance = 
(MetricReporter) reporterClass.newInstance();
-
-                                       MetricConfig metricConfig = new 
MetricConfig();
-                                       
reporterConfig.addAllToProperties(metricConfig);
-                                       LOG.info("Configuring {} with {}.", 
reporterClass.getSimpleName(), metricConfig);
-                                       reporterInstance.open(metricConfig);
-
-                                       if (reporterInstance instanceof 
Scheduled) {
-                                               LOG.info("Periodically 
reporting metrics in intervals of {} {} for reporter {} of type {}.", period, 
timeunit.name(), namedReporter, className);
-
-                                               executor.scheduleWithFixedDelay(
-                                                               new 
MetricRegistry.ReporterTask((Scheduled) reporterInstance), period, period, 
timeunit);
-                                       } else {
-                                               LOG.info("Reporting metrics for 
reporter {} of type {}.", namedReporter, className);
-                                       }
-                                       reporters.add(reporterInstance);
-
-                                       String delimiterForReporter = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, 
String.valueOf(globalDelimiter));
-                                       if (delimiterForReporter.length() != 1) 
{
-                                               LOG.warn("Failed to parse 
delimiter '{}' for reporter '{}', using global delimiter '{}'.", 
delimiterForReporter, namedReporter, globalDelimiter);
-                                               delimiterForReporter = 
String.valueOf(globalDelimiter);
-                                       }
-                                       
this.delimiters.add(delimiterForReporter.charAt(0));
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Could not instantiate 
metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Initializes the MetricQueryService.
-        *
-        * @param actorSystem ActorSystem to create the MetricQueryService on
-        * @param resourceID resource ID used to disambiguate the actor name
-     */
-       public void startQueryService(ActorSystem actorSystem, ResourceID 
resourceID) {
-               synchronized (lock) {
-                       Preconditions.checkState(!isShutdown(), "The metric 
registry has already been shut down.");
-
-                       try {
-                               queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
-                               metricQueryServicePath = 
AkkaUtils.getAkkaURL(actorSystem, queryService);
-                       } catch (Exception e) {
-                               LOG.warn("Could not start MetricDumpActor. No 
metrics will be submitted to the WebInterface.", e);
-                       }
-               }
-       }
-
-       /**
-        * Returns the address under which the {@link MetricQueryService} is 
reachable.
-        *
-        * @return address of the metric query service
-        */
-       @Nullable
-       public String getMetricQueryServicePath() {
-               return metricQueryServicePath;
-       }
+public interface MetricRegistry {
 
        /**
         * Returns the global delimiter.
         *
         * @return global delimiter
         */
-       public char getDelimiter() {
-               return this.globalDelimiter;
-       }
+       char getDelimiter();
 
        /**
         * Returns the configured delimiter for the reporter with the given 
index.
         *
-        * @param reporterIndex index of the reporter whose delimiter should be 
used
+        * @param index index of the reporter whose delimiter should be used
         * @return configured reporter delimiter, or global delimiter if index 
is invalid
         */
-       public char getDelimiter(int reporterIndex) {
-               try {
-                       return delimiters.get(reporterIndex);
-               } catch (IndexOutOfBoundsException e) {
-                       LOG.warn("Delimiter for reporter index {} not found, 
returning global delimiter.", reporterIndex);
-                       return this.globalDelimiter;
-               }
-       }
-
-       public List<MetricReporter> getReporters() {
-               return reporters;
-       }
-
-       /**
-        * Returns whether this registry has been shutdown.
-        *
-        * @return true, if this registry was shutdown, otherwise false
-        */
-       public boolean isShutdown() {
-               synchronized (lock) {
-                       return reporters == null && executor.isShutdown();
-               }
-       }
+       char getDelimiter(int index);
 
        /**
-        * Shuts down this registry and the associated {@link MetricReporter}.
+        * Returns the number of registered reporters.
         */
-       public void shutdown() {
-               synchronized (lock) {
-                       Future<Boolean> stopFuture = null;
-                       FiniteDuration stopTimeout = null;
-
-                       if (queryService != null) {
-                               stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
-                               stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
-                       }
-
-                       if (reporters != null) {
-                               for (MetricReporter reporter : reporters) {
-                                       try {
-                                               reporter.close();
-                                       } catch (Throwable t) {
-                                               LOG.warn("Metrics reporter did 
not shut down cleanly", t);
-                                       }
-                               }
-                               reporters = null;
-                       }
-                       shutdownExecutor();
-
-                       if (stopFuture != null) {
-                               boolean stopped = false;
-
-                               try {
-                                       stopped = Await.result(stopFuture, 
stopTimeout);
-                               } catch (Exception e) {
-                                       LOG.warn("Query actor did not properly 
stop.", e);
-                               }
-
-                               if (!stopped) {
-                                       // the query actor did not stop in 
time, let's kill him
-                                       queryService.tell(Kill.getInstance(), 
ActorRef.noSender());
-                               }
-                       }
-               }
-       }
-
-       private void shutdownExecutor() {
-               if (executor != null) {
-                       executor.shutdown();
-
-                       try {
-                               if (!executor.awaitTermination(1L, 
TimeUnit.SECONDS)) {
-                                       executor.shutdownNow();
-                               }
-                       } catch (InterruptedException e) {
-                               executor.shutdownNow();
-                       }
-               }
-       }
-
-       public ScopeFormats getScopeFormats() {
-               return scopeFormats;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Metrics (de)registration
-       // 
------------------------------------------------------------------------
+       int getNumberReporters();
 
        /**
         * Registers a new {@link Metric} with this registry.
@@ -303,44 +54,7 @@ public class MetricRegistry {
         * @param metricName  the name of the metric
         * @param group       the group that contains the metric
         */
-       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               synchronized (lock) {
-                       if (isShutdown()) {
-                               LOG.warn("Cannot register metric, because the 
MetricRegistry has already been shut down.");
-                       } else {
-                               if (reporters != null) {
-                                       for (int i = 0; i < reporters.size(); 
i++) {
-                                               MetricReporter reporter = 
reporters.get(i);
-                                               try {
-                                                       if (reporter != null) {
-                                                               
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
-                                                               
reporter.notifyOfAddedMetric(metric, metricName, front);
-                                                       }
-                                               } catch (Exception e) {
-                                                       LOG.warn("Error while 
registering metric.", e);
-                                               }
-                                       }
-                               }
-                               try {
-                                       if (queryService != null) {
-                                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Error while registering 
metric.", e);
-                               }
-                               try {
-                                       if (metric instanceof View) {
-                                               if (viewUpdater == null) {
-                                                       viewUpdater = new 
ViewUpdater(executor);
-                                               }
-                                               
viewUpdater.notifyOfAddedView((View) metric);
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Error while registering 
metric.", e);
-                               }
-                       }
-               }
-       }
+       void register(Metric metric, String metricName, AbstractMetricGroup 
group);
 
        /**
         * Un-registers the given {@link Metric} with this registry.
@@ -349,79 +63,7 @@ public class MetricRegistry {
         * @param metricName  the name of the metric
         * @param group       the group that contains the metric
         */
-       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
-               synchronized (lock) {
-                       if (isShutdown()) {
-                               LOG.warn("Cannot unregister metric, because the 
MetricRegistry has already been shut down.");
-                       } else {
-                               if (reporters != null) {
-                                       for (int i = 0; i < reporters.size(); 
i++) {
-                                               try {
-                                               MetricReporter reporter = 
reporters.get(i);
-                                                       if (reporter != null) {
-                                                               
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
-                                                               
reporter.notifyOfRemovedMetric(metric, metricName, front);
-                                                       }
-                                               } catch (Exception e) {
-                                                       LOG.warn("Error while 
registering metric.", e);
-                                               }
-                                       }
-                               }
-                               try {
-                                       if (queryService != null) {
-                                               
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Error while registering 
metric.", e);
-                               }
-                               try {
-                                       if (metric instanceof View) {
-                                               if (viewUpdater != null) {
-                                                       
viewUpdater.notifyOfRemovedView((View) metric);
-                                               }
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Error while registering 
metric.", e);
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       @VisibleForTesting
-       @Nullable
-       public ActorRef getQueryService() {
-               return queryService;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * This task is explicitly a static class, so that it does not hold any 
references to the enclosing
-        * MetricsRegistry instance.
-        *
-        * <p>This is a subtle difference, but very important: With this static 
class, the enclosing class instance
-        * may become garbage-collectible, whereas with an anonymous inner 
class, the timer thread
-        * (which is a GC root) will hold a reference via the timer task and 
its enclosing instance pointer.
-        * Making the MetricsRegistry garbage collectible makes the 
java.util.Timer garbage collectible,
-        * which acts as a fail-safe to stop the timer thread and prevents 
resource leaks.
-        */
-       private static final class ReporterTask extends TimerTask {
-
-               private final Scheduled reporter;
-
-               private ReporterTask(Scheduled reporter) {
-                       this.reporter = reporter;
-               }
+       void unregister(Metric metric, String metricName, AbstractMetricGroup 
group);
 
-               @Override
-               public void run() {
-                       try {
-                               reporter.report();
-                       } catch (Throwable t) {
-                               LOG.warn("Error while reporting metrics", t);
-                       }
-               }
-       }
+       ScopeFormats getScopeFormats();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
index e72a980..d07cb65 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
@@ -35,7 +35,7 @@ import java.util.List;
 import java.util.regex.Pattern;
 
 /**
- * Configuration object for {@link MetricRegistry}.
+ * Configuration object for {@link MetricRegistryImpl}.
  */
 public class MetricRegistryConfiguration {
 

Reply via email to