[FLINK-7876] Properly start and shutdown MetricRegistry by ClusterEntrypoint

Suppress MetricRegistry#shutdown exceptions if the metric query service actor's 
actor system has already been
shut down.

Address PR comments

Pull out TaskManagerMetricGroup instantiation from TaskManagerServices


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad42ee27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad42ee27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad42ee27

Branch: refs/heads/master
Commit: ad42ee27decb7c563e58ef090373ae8648ca8e81
Parents: d45b941
Author: Till <[email protected]>
Authored: Fri Oct 20 12:13:10 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed Nov 1 15:52:01 2017 +0100

----------------------------------------------------------------------
 .../entrypoint/MesosJobClusterEntrypoint.java   | 12 ++++--
 .../MesosSessionClusterEntrypoint.java          | 12 ++++--
 .../MesosApplicationMasterRunner.java           |  6 ++-
 .../clusterframework/MesosResourceManager.java  |  4 +-
 .../clusterframework/MesosJobManager.scala      |  1 -
 .../MesosResourceManagerTest.java               |  5 ++-
 .../ScheduledDropwizardReporterTest.java        |  2 +-
 .../DropwizardFlinkHistogramWrapperTest.java    |  2 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  2 +-
 .../metrics/statsd/StatsDReporterTest.java      |  2 +-
 .../flink/runtime/dispatcher/Dispatcher.java    | 22 ++---------
 .../dispatcher/StandaloneDispatcher.java        |  6 +--
 .../runtime/entrypoint/ClusterEntrypoint.java   |  8 +++-
 .../entrypoint/JobClusterEntrypoint.java        |  8 ++--
 .../entrypoint/SessionClusterEntrypoint.java    |  8 ++--
 .../StandaloneSessionClusterEntrypoint.java     |  4 +-
 .../runtime/jobmaster/JobManagerRunner.java     |  4 +-
 .../flink/runtime/metrics/MetricRegistry.java   | 16 ++++++++
 .../runtime/metrics/MetricRegistryImpl.java     | 12 +++++-
 .../flink/runtime/metrics/util/MetricUtils.java | 19 +++++++++
 .../flink/runtime/minicluster/MiniCluster.java  | 23 ++++++-----
 .../minicluster/MiniClusterJobDispatcher.java   |  8 ++--
 .../minicluster/StandaloneMiniCluster.java      |  8 ++++
 .../resourcemanager/ResourceManager.java        |  6 +--
 .../resourcemanager/ResourceManagerGateway.java |  2 +-
 .../resourcemanager/ResourceManagerRunner.java  |  4 +-
 .../StandaloneResourceManager.java              |  4 +-
 .../handler/legacy/TaskManagerLogHandler.java   | 37 +++++++++++-------
 .../runtime/taskexecutor/TaskExecutor.java      |  6 ---
 .../runtime/taskexecutor/TaskManagerRunner.java | 41 ++++++++++++--------
 .../taskexecutor/TaskManagerServices.java       | 26 +------------
 .../runtime/minicluster/FlinkMiniCluster.scala  | 11 ++++--
 .../minicluster/LocalFlinkMiniCluster.scala     | 29 ++++++--------
 .../flink/runtime/taskmanager/TaskManager.scala | 18 +++++++--
 .../runtime/dispatcher/DispatcherTest.java      |  5 ++-
 .../jobmanager/JobManagerHARecoveryTest.java    |  8 ++--
 .../JobManagerLeaderElectionTest.java           | 29 +++++++-------
 .../runtime/metrics/NoOpMetricRegistry.java     |  8 ++++
 .../runtime/metrics/TaskManagerMetricsTest.java | 12 ++++--
 .../testutils/UnregisteredTaskMetricsGroup.java | 10 ++---
 .../legacy/metrics/MetricFetcherTest.java       |  4 +-
 .../taskexecutor/TaskExecutorITCase.java        |  1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  | 25 ++++++------
 ...askManagerComponentsStartupShutdownTest.java | 25 ++++++------
 .../runtime/testingUtils/TestingCluster.scala   |  1 -
 .../test/checkpointing/SavepointITCase.java     |  1 +
 .../flink/yarn/YarnApplicationMasterRunner.java |  6 ++-
 .../apache/flink/yarn/YarnResourceManager.java  |  4 +-
 .../entrypoint/YarnJobClusterEntrypoint.java    |  4 +-
 .../YarnSessionClusterEntrypoint.java           |  4 +-
 50 files changed, 300 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
index b98adff..2fe99de 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -112,7 +112,13 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
        }
 
        @Override
-       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl 
metricRegistry) throws Exception {
+       protected void startClusterComponents(
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry) throws Exception {
                super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
        }
 
@@ -123,7 +129,7 @@ public class MesosJobClusterEntrypoint extends 
JobClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry,
+               MetricRegistry metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
index 0cf0fce..b8d9f65 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -102,7 +102,13 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
        }
 
        @Override
-       protected void startClusterComponents(Configuration configuration, 
RpcService rpcService, HighAvailabilityServices highAvailabilityServices, 
BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl 
metricRegistry) throws Exception {
+       protected void startClusterComponents(
+                       Configuration configuration,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       BlobServer blobServer,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry) throws Exception {
                super.startClusterComponents(configuration, rpcService, 
highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
        }
 
@@ -113,7 +119,7 @@ public class MesosSessionClusterEntrypoint extends 
SessionClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry,
+               MetricRegistry metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration rmConfiguration = 
ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
rmServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 9887d97..93eb3c6 100755
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -432,7 +432,11 @@ public class MesosApplicationMasterRunner {
                }
 
                if (metricRegistry != null) {
-                       metricRegistry.shutdown();
+                       try {
+                               metricRegistry.shutdown();
+                       } catch (Throwable t) {
+                               LOG.error("Could not shut down metric 
registry.", t);
+                       }
                }
 
                org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 7ea4908..1e32b2c 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -49,7 +49,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -145,7 +145,7 @@ public class MesosResourceManager extends 
ResourceManager<RegisteredMesosWorkerN
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler,
                        // Mesos specifics

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
index c6230e7..972af35 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosJobManager.scala
@@ -32,7 +32,6 @@ import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.metrics.{MetricRegistryImpl => 
FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 1cdd087..a45abe0 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
@@ -160,7 +161,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler,
 
@@ -306,7 +307,7 @@ public class MesosResourceManagerTest extends TestLogger {
                        public final ScheduledExecutor scheduledExecutor;
                        public final TestingHighAvailabilityServices 
highAvailabilityServices;
                        public final HeartbeatServices heartbeatServices;
-                       public final MetricRegistryImpl metricRegistry;
+                       public final MetricRegistry metricRegistry;
                        public final TestingLeaderElectionService 
rmLeaderElectionService;
                        public final JobLeaderIdService jobLeaderIdService;
                        public final SlotManager slotManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index e6d5e27..3fa0474 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -34,8 +34,8 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index a927a30..8f70abb 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.util.TestLogger;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 4c97055..98c2d1b 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestReporter;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index f460abd..275f2e1 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index c2f8539..cf3405b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -45,20 +45,17 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceOverview;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
-import akka.actor.ActorSystem;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -88,7 +85,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        private final ResourceManagerGateway resourceManagerGateway;
        private final JobManagerServices jobManagerServices;
        private final HeartbeatServices heartbeatServices;
-       private final MetricRegistryImpl metricRegistry;
+       private final MetricRegistry metricRegistry;
 
        private final FatalErrorHandler fatalErrorHandler;
 
@@ -106,7 +103,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler,
                        Optional<String> restAddress) throws Exception {
                super(rpcService, endpointId);
@@ -162,12 +159,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                }
 
                try {
-                       metricRegistry.shutdown();
-               } catch (Exception e) {
-                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
-               }
-
-               try {
                        super.postStop();
                } catch (Exception e) {
                        exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
@@ -182,11 +173,6 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        public void start() throws Exception {
                super.start();
 
-               // start the MetricQueryService
-               // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
-               final ActorSystem actorSystem = ((AkkaRpcService) 
getRpcService()).getActorSystem();
-               metricRegistry.startQueryService(actorSystem, null);
-
                leaderElectionService.start(this);
        }
 
@@ -479,7 +465,7 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
                JobManagerServices jobManagerServices,
-               MetricRegistryImpl metricRegistry,
+               MetricRegistry metricRegistry,
                OnCompletionActions onCompleteActions,
                FatalErrorHandler fatalErrorHandler) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
index ee92663..5a6889e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/StandaloneDispatcher.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -49,7 +49,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        ResourceManagerGateway resourceManagerGateway,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler,
                        Optional<String> restAddress) throws Exception {
                super(
@@ -74,7 +74,7 @@ public class StandaloneDispatcher extends Dispatcher {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        JobManagerServices jobManagerServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        OnCompletionActions onCompleteActions,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                // create the standard job manager runner

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index 1a0e2ae..156efdc 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -180,6 +181,11 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                blobServer.start();
                heartbeatServices = createHeartbeatServices(configuration);
                metricRegistry = createMetricRegistry(configuration);
+
+               // TODO: This is a temporary hack until we have ported the 
MetricQueryService to the new RpcEndpoint
+               // start the MetricQueryService
+               final ActorSystem actorSystem = ((AkkaRpcService) 
commonRpcService).getActorSystem();
+               metricRegistry.startQueryService(actorSystem, null);
        }
 
        protected RpcService createRpcService(
@@ -278,7 +284,7 @@ public abstract class ClusterEntrypoint implements 
FatalErrorHandler {
                HighAvailabilityServices highAvailabilityServices,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry) throws Exception;
+               MetricRegistry metricRegistry) throws Exception;
 
        protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 50d29da..124c6c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -59,7 +59,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        HighAvailabilityServices highAvailabilityServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry) throws Exception {
+                       MetricRegistry metricRegistry) throws Exception {
 
                resourceManager = createResourceManager(
                        configuration,
@@ -96,7 +96,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                        HighAvailabilityServices highAvailabilityServices,
                        JobManagerServices jobManagerServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
 
                JobGraph jobGraph = retrieveJobGraph(configuration);
@@ -163,7 +163,7 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry,
+               MetricRegistry metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception;
 
        protected abstract JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 8a48864..e24e01a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
@@ -69,7 +69,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        HighAvailabilityServices highAvailabilityServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry) throws Exception {
+                       MetricRegistry metricRegistry) throws Exception {
 
                dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
 
@@ -173,7 +173,7 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                ResourceManagerGateway resourceManagerGateway,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry,
+               MetricRegistry metricRegistry,
                FatalErrorHandler fatalErrorHandler,
                Optional<String> restAddress) throws Exception {
 
@@ -197,6 +197,6 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry,
+               MetricRegistry metricRegistry,
                FatalErrorHandler fatalErrorHandler) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
index 7d4373d..e7c9816 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/StandaloneSessionClusterEntrypoint.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
@@ -51,7 +51,7 @@ public class StandaloneSessionClusterEntrypoint extends 
SessionClusterEntrypoint
                        RpcService rpcService,
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        FatalErrorHandler fatalErrorHandler) throws Exception {
                final ResourceManagerConfiguration resourceManagerConfiguration 
= ResourceManagerConfiguration.fromConfiguration(configuration);
                final ResourceManagerRuntimeServicesConfiguration 
resourceManagerRuntimeServicesConfiguration = 
ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 0a85bbe..14baa6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -111,7 +111,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        final HighAvailabilityServices haServices,
                        final HeartbeatServices heartbeatServices,
                        final JobManagerServices jobManagerServices,
-                       final MetricRegistryImpl metricRegistry,
+                       final MetricRegistry metricRegistry,
                        final OnCompletionActions toNotifyOnComplete,
                        final FatalErrorHandler errorHandler) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 9aa97cb..782d66a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -19,9 +19,12 @@
 package org.apache.flink.runtime.metrics;
 
 import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 
+import javax.annotation.Nullable;
+
 /**
  * Interface for a metric registry.
  */
@@ -65,5 +68,18 @@ public interface MetricRegistry {
         */
        void unregister(Metric metric, String metricName, AbstractMetricGroup 
group);
 
+       /**
+        * Returns the scope formats.
+        *
+        * @return scope formats
+        */
        ScopeFormats getScopeFormats();
+
+       /**
+        * Returns the path of the {@link MetricQueryService} or null, if none 
is started.
+        *
+        * @return Path of the MetricQueryService or null, if none is started
+        */
+       @Nullable
+       String getMetricQueryServicePath();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index 407fa8b..3e4f56f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -188,6 +188,7 @@ public class MetricRegistryImpl implements MetricRegistry {
         *
         * @return address of the metric query service
         */
+       @Override
        @Nullable
        public String getMetricQueryServicePath() {
                return metricQueryServicePath;
@@ -238,7 +239,16 @@ public class MetricRegistryImpl implements MetricRegistry {
 
                        if (queryService != null) {
                                stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
-                               stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+
+                               try {
+                                       stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+                               } catch (IllegalStateException ignored) {
+                                       // this can happen if the underlying 
actor system has been stopped before shutting
+                                       // the metric registry down
+                                       // TODO: Pull the MetricQueryService 
actor out of the MetricRegistry
+                                       LOG.debug("The metric query service 
actor has already been stopped because the " +
+                                               "underlying ActorSystem has 
already been shut down.");
+                               }
                        }
 
                        if (reporters != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
index 2ecde42..08353e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
@@ -21,6 +21,10 @@ package org.apache.flink.runtime.metrics.util;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import org.apache.commons.lang3.text.WordUtils;
 import org.slf4j.Logger;
@@ -47,6 +51,21 @@ public class MetricUtils {
        private MetricUtils() {
        }
 
+       public static TaskManagerMetricGroup instantiateTaskManagerMetricGroup(
+                       MetricRegistry metricRegistry,
+                       TaskManagerLocation taskManagerLocation,
+                       NetworkEnvironment network) {
+               final TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
+                       metricRegistry,
+                       taskManagerLocation.getHostname(),
+                       taskManagerLocation.getResourceID().toString());
+
+               // Initialize the TM metrics
+               
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
network);
+
+               return taskManagerMetricGroup;
+       }
+
        public static void instantiateNetworkMetrics(
                MetricGroup metrics,
                final NetworkEnvironment network) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index d4248ee..2bbd2c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,8 +32,9 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
@@ -160,6 +161,10 @@ public class MiniCluster {
                                // we always need the 'commonRpcService' for 
auxiliary calls
                                commonRpcService = 
createRpcService(configuration, rpcTimeout, false, null);
 
+                               // TODO: Temporary hack until the metric query 
service is ported to the RpcEndpoint
+                               final ActorSystem actorSystem = 
((AkkaRpcService) commonRpcService).getActorSystem();
+                               metricRegistry.startQueryService(actorSystem, 
null);
+
                                if (useSingleRpcService) {
                                        // set that same RPC service for all 
JobManagers and TaskManagers
                                        for (int i = 0; i < numJobManagers; 
i++) {
@@ -326,6 +331,12 @@ public class MiniCluster {
                        taskManagers = null;
                }
 
+               // metrics shutdown
+               if (metricRegistry != null) {
+                       metricRegistry.shutdown();
+                       metricRegistry = null;
+               }
+
                // shut down the RpcServices
                exception = shutDownRpc(commonRpcService, exception);
                exception = shutDownRpcs(jobManagerRpcServices, exception);
@@ -356,12 +367,6 @@ public class MiniCluster {
                        haServices = null;
                }
 
-               // metrics shutdown
-               if (metricRegistry != null) {
-                       metricRegistry.shutdown();
-                       metricRegistry = null;
-               }
-
                // if anything went wrong, throw the first error with all the 
additional suppressed exceptions
                if (exception != null) {
                        ExceptionUtils.rethrowException(exception, "Error while 
shutting down mini cluster");
@@ -502,7 +507,7 @@ public class MiniCluster {
                        Configuration configuration,
                        HighAvailabilityServices haServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        int numResourceManagers,
                        RpcService[] resourceManagerRpcServices) throws 
Exception {
 
@@ -528,7 +533,7 @@ public class MiniCluster {
        protected TaskExecutor[] startTaskManagers(
                        Configuration configuration,
                        HighAvailabilityServices haServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        int numTaskManagers,
                        RpcService[] taskManagerRpcServices) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index ca042b6..60d9a66 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
@@ -75,7 +75,7 @@ public class MiniClusterJobDispatcher {
        private final JobManagerServices jobManagerServices;
 
        /** Registry for all metrics in the mini cluster */
-       private final MetricRegistryImpl metricRegistry;
+       private final MetricRegistry metricRegistry;
 
        /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
        private final int numJobManagers;
@@ -104,7 +104,7 @@ public class MiniClusterJobDispatcher {
                        HighAvailabilityServices haServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry) throws Exception {
+                       MetricRegistry metricRegistry) throws Exception {
                this(
                        config,
                        haServices,
@@ -132,7 +132,7 @@ public class MiniClusterJobDispatcher {
                        HighAvailabilityServices haServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        int numJobManagers,
                        RpcService[] rpcServices) throws Exception {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index 90fb115..a8c402a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -95,6 +95,8 @@ public class StandaloneMiniCluster {
                metricRegistry = new MetricRegistryImpl(
                        
MetricRegistryConfiguration.fromConfiguration(configuration));
 
+               metricRegistry.startQueryService(actorSystem, null);
+
                JobManager.startJobManagerActors(
                        configuration,
                        actorSystem,
@@ -142,6 +144,12 @@ public class StandaloneMiniCluster {
        public void close() throws Exception {
                Exception exception = null;
 
+               try {
+                       metricRegistry.shutdown();
+               } catch (Exception e) {
+                       exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+               }
+
                actorSystem.shutdown();
                actorSystem.awaitTermination();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 98b80c6..cccaf95 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -42,7 +42,7 @@ import 
org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -118,7 +118,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
        /** Registry to use for metrics. */
-       private final MetricRegistryImpl metricRegistry;
+       private final MetricRegistry metricRegistry;
 
        /** Fatal error handler. */
        private final FatalErrorHandler fatalErrorHandler;
@@ -140,7 +140,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index cc2766b..f67368c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -174,7 +174,7 @@ public interface ResourceManagerGateway extends 
FencedRpcGateway<ResourceManager
         * Requests the paths for the TaskManager's {@link MetricQueryService} 
to query.
         *
         * @param timeout for the asynchronous operation
-        * @return Future containing the collection of instance ids and the 
corresponding metric query service path
+        * @return Future containing the collection of resource ids and the 
corresponding metric query service path
         */
        CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index 361bdd4..caa3ba0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
@@ -55,7 +55,7 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                        final RpcService rpcService,
                        final HighAvailabilityServices highAvailabilityServices,
                        final HeartbeatServices heartbeatServices,
-                       final MetricRegistryImpl metricRegistry) throws 
Exception {
+                       final MetricRegistry metricRegistry) throws Exception {
 
                Preconditions.checkNotNull(resourceId);
                Preconditions.checkNotNull(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index d2b1205..624f31d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -45,7 +45,7 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistryImpl metricRegistry,
+                       MetricRegistry metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 4d6ccd5..118e356 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.WebHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.StringUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
@@ -66,7 +65,9 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
+import java.net.URLDecoder;
 import java.nio.channels.FileChannel;
 import java.util.HashMap;
 import java.util.Objects;
@@ -160,14 +161,22 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                                executor);
                }
 
-               final String taskManagerID = 
routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
+               final String taskManagerId = 
routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
                final HttpRequest request = routed.request();
 
                //fetch TaskManager logs if no other process is currently doing 
it
-               if (lastRequestPending.putIfAbsent(taskManagerID, true) == 
null) {
+               if (lastRequestPending.putIfAbsent(taskManagerId, true) == 
null) {
                        try {
-                               ResourceID resourceId = new ResourceID(new 
String(StringUtils.hexStringToByte(taskManagerID)));
-                               CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, 
timeout);
+                               final String unescapedString;
+
+                               try {
+                                       unescapedString = 
URLDecoder.decode(taskManagerId, "UTF-8");
+                               } catch (UnsupportedEncodingException e) {
+                                       throw new FlinkException("Could not 
decode task manager id: " + taskManagerId + '.', e);
+                               }
+
+                               final ResourceID resourceId = new 
ResourceID(unescapedString);
+                               final CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, 
timeout);
 
                                CompletableFuture<TransientBlobKey> 
blobKeyFuture = taskManagerFuture.thenCompose(
                                        (Optional<Instance> optTMInstance) -> {
@@ -189,18 +198,18 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                                                (blobKey, blobCache) -> {
                                                        //delete previous log 
file, if it is different than the current one
                                                        HashMap<String, 
TransientBlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? 
lastSubmittedLog : lastSubmittedStdout;
-                                                       if 
(lastSubmittedFile.containsKey(taskManagerID)) {
+                                                       if 
(lastSubmittedFile.containsKey(taskManagerId)) {
                                                                // the BlobKey 
will almost certainly be different but the old file
                                                                // may not 
exist anymore so we cannot rely on it and need to
                                                                // download the 
new file anyway, even if the hashes match
-                                                               if 
(!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
-                                                                       if 
(!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerID))) {
-                                                                               
throw new CompletionException(new FlinkException("Could not delete file for " + 
taskManagerID + '.'));
+                                                               if 
(!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerId))) {
+                                                                       if 
(!blobCache.deleteFromCache(lastSubmittedFile.get(taskManagerId))) {
+                                                                               
throw new CompletionException(new FlinkException("Could not delete file for " + 
taskManagerId + '.'));
                                                                        }
-                                                                       
lastSubmittedFile.put(taskManagerID, blobKey);
+                                                                       
lastSubmittedFile.put(taskManagerId, blobKey);
                                                                }
                                                        } else {
-                                                               
lastSubmittedFile.put(taskManagerID, blobKey);
+                                                               
lastSubmittedFile.put(taskManagerId, blobKey);
                                                        }
                                                        try {
                                                                return 
blobCache.getFile(blobKey).getAbsolutePath();
@@ -214,7 +223,7 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                                        failure -> {
                                                display(ctx, request, "Fetching 
TaskManager log failed.");
                                                LOG.error("Fetching TaskManager 
log failed.", failure);
-                                               
lastRequestPending.remove(taskManagerID);
+                                               
lastRequestPending.remove(taskManagerId);
 
                                                return null;
                                        });
@@ -261,7 +270,7 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                                                // write the content.
                                                ChannelFuture lastContentFuture;
                                                final 
GenericFutureListener<Future<? super Void>> completionListener = future -> {
-                                                       
lastRequestPending.remove(taskManagerID);
+                                                       
lastRequestPending.remove(taskManagerId);
                                                        fc.close();
                                                        raf.close();
                                                };
@@ -294,7 +303,7 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                        } catch (Exception e) {
                                display(ctx, request, "Error: " + 
e.getMessage());
                                LOG.error("Fetching TaskManager log failed.", 
e);
-                               lastRequestPending.remove(taskManagerID);
+                               lastRequestPending.remove(taskManagerId);
                        }
                } else {
                        display(ctx, request, "loading...");

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a956111..c48d188 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -55,7 +55,6 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -134,9 +133,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        /** The network component in the task manager */
        private final NetworkEnvironment networkEnvironment;
 
-       /** The metric registry in the task manager */
-       private final MetricRegistryImpl metricRegistry;
-
        /** The heartbeat manager for job manager in the task manager */
        private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
@@ -179,7 +175,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        NetworkEnvironment networkEnvironment,
                        HighAvailabilityServices haServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistryImpl metricRegistry,
                        TaskManagerMetricGroup taskManagerMetricGroup,
                        BroadcastVariableManager broadcastVariableManager,
                        FileCache fileCache,
@@ -198,7 +193,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                this.ioManager = checkNotNull(ioManager);
                this.networkEnvironment = checkNotNull(networkEnvironment);
                this.haServices = checkNotNull(haServices);
-               this.metricRegistry = checkNotNull(metricRegistry);
                this.taskSlotTable = checkNotNull(taskSlotTable);
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 5a69bb1..a24daf0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -29,8 +29,11 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -75,7 +78,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
        private final Configuration configuration;
 
-       private final ResourceID resourceID;
+       private final ResourceID resourceId;
 
        private final Time timeout;
 
@@ -92,7 +95,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
        public TaskManagerRunner(Configuration configuration, ResourceID 
resourceId) throws Exception {
                this.configuration = Preconditions.checkNotNull(configuration);
-               this.resourceID = Preconditions.checkNotNull(resourceId);
+               this.resourceId = Preconditions.checkNotNull(resourceId);
 
                timeout = AkkaUtils.getTimeoutAsTime(configuration);
 
@@ -111,12 +114,13 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
 
                metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
+               // TODO: Temporary hack until the MetricQueryService has been 
ported to RpcEndpoint
                final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
                metricRegistry.startQueryService(actorSystem, resourceId);
 
                taskManager = startTaskManager(
-                       configuration,
-                       resourceId,
+                       this.configuration,
+                       this.resourceId,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
@@ -242,14 +246,14 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
        // 
--------------------------------------------------------------------------------------------
 
        public static TaskExecutor startTaskManager(
-               Configuration configuration,
-               ResourceID resourceID,
-               RpcService rpcService,
-               HighAvailabilityServices highAvailabilityServices,
-               HeartbeatServices heartbeatServices,
-               MetricRegistryImpl metricRegistry,
-               boolean localCommunicationOnly,
-               FatalErrorHandler fatalErrorHandler) throws Exception {
+                       Configuration configuration,
+                       ResourceID resourceID,
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       MetricRegistry metricRegistry,
+                       boolean localCommunicationOnly,
+                       FatalErrorHandler fatalErrorHandler) throws Exception {
 
                Preconditions.checkNotNull(configuration);
                Preconditions.checkNotNull(resourceID);
@@ -266,8 +270,12 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
 
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                        taskManagerServicesConfiguration,
-                       resourceID,
-                       metricRegistry);
+                       resourceID);
+
+               TaskManagerMetricGroup taskManagerMetricGroup = 
MetricUtils.instantiateTaskManagerMetricGroup(
+                       metricRegistry,
+                       taskManagerServices.getTaskManagerLocation(),
+                       taskManagerServices.getNetworkEnvironment());
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
@@ -280,8 +288,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                        taskManagerServices.getNetworkEnvironment(),
                        highAvailabilityServices,
                        heartbeatServices,
-                       metricRegistry,
-                       taskManagerServices.getTaskManagerMetricGroup(),
+                       taskManagerMetricGroup,
                        taskManagerServices.getBroadcastVariableManager(),
                        taskManagerServices.getFileCache(),
                        taskManagerServices.getTaskSlotTable(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 85e62c6..aed03f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -37,16 +37,12 @@ import 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.runtime.query.QueryableStateUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
-import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -63,7 +59,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link 
MemoryManager}, {@link IOManager},
- * {@link NetworkEnvironment} and the {@link MetricRegistryImpl}.
+ * {@link NetworkEnvironment}.
  */
 public class TaskManagerServices {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
@@ -73,7 +69,6 @@ public class TaskManagerServices {
        private final MemoryManager memoryManager;
        private final IOManager ioManager;
        private final NetworkEnvironment networkEnvironment;
-       private final TaskManagerMetricGroup taskManagerMetricGroup;
        private final BroadcastVariableManager broadcastVariableManager;
        private final FileCache fileCache;
        private final TaskSlotTable taskSlotTable;
@@ -85,7 +80,6 @@ public class TaskManagerServices {
                MemoryManager memoryManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
-               TaskManagerMetricGroup taskManagerMetricGroup,
                BroadcastVariableManager broadcastVariableManager,
                FileCache fileCache,
                TaskSlotTable taskSlotTable,
@@ -96,7 +90,6 @@ public class TaskManagerServices {
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
                this.ioManager = Preconditions.checkNotNull(ioManager);
                this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
-               this.taskManagerMetricGroup = 
Preconditions.checkNotNull(taskManagerMetricGroup);
                this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
                this.fileCache = Preconditions.checkNotNull(fileCache);
                this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
@@ -124,10 +117,6 @@ public class TaskManagerServices {
                return taskManagerLocation;
        }
 
-       public TaskManagerMetricGroup getTaskManagerMetricGroup() {
-               return taskManagerMetricGroup;
-       }
-
        public BroadcastVariableManager getBroadcastVariableManager() {
                return broadcastVariableManager;
        }
@@ -157,14 +146,12 @@ public class TaskManagerServices {
         *
         * @param resourceID resource ID of the task manager
         * @param taskManagerServicesConfiguration task manager configuration
-        * @param metricRegistry to register the TaskManagerMetricGroup
         * @return task manager components
         * @throws Exception
         */
        public static TaskManagerServices fromConfiguration(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
-                       ResourceID resourceID,
-                       MetricRegistry metricRegistry) throws Exception {
+                       ResourceID resourceID) throws Exception {
 
                // pre-start checks
                
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -183,14 +170,6 @@ public class TaskManagerServices {
                // start the I/O manager, it will create some temp directories.
                final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-               final TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
-                       metricRegistry,
-                       taskManagerLocation.getHostname(),
-                       taskManagerLocation.getResourceID().toString());
-
-               // Initialize the TM metrics
-               
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
network);
-
                final BroadcastVariableManager broadcastVariableManager = new 
BroadcastVariableManager();
 
                final FileCache fileCache = new 
FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -216,7 +195,6 @@ public class TaskManagerServices {
                        memoryManager,
                        ioManager,
                        network,
-                       taskManagerMetricGroup,
                        broadcastVariableManager,
                        fileCache,
                        taskSlotTable,

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 689d98f..227b854 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -122,8 +122,7 @@ abstract class FlinkMiniCluster(
     Hardware.getNumberCPUCores(),
     new ExecutorThreadFactory("mini-cluster-io"))
 
-  protected val metricRegistry = new MetricRegistryImpl(
-    MetricRegistryConfiguration.fromConfiguration(originalConfiguration))
+  protected var metricRegistryOpt: Option[MetricRegistryImpl] = None
 
   def this(configuration: Configuration, useSingleActorSystem: Boolean) {
     this(
@@ -329,6 +328,11 @@ abstract class FlinkMiniCluster(
 
     lazy val singleActorSystem = startJobManagerActorSystem(0)
 
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(originalConfiguration))
+
+    metricRegistryOpt = Some(metricRegistry)
+
     if 
(originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
false)) {
       metricRegistry.startQueryService(singleActorSystem, null)
     }
@@ -463,6 +467,8 @@ abstract class FlinkMiniCluster(
 
     Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout)
 
+    metricRegistryOpt.foreach(_.shutdown())
+
     if (!useSingleActorSystem) {
       taskManagerActorSystems foreach {
         _ foreach(_.shutdown())
@@ -476,7 +482,6 @@ abstract class FlinkMiniCluster(
     jobManagerActorSystems foreach {
       _ foreach(_.shutdown())
     }
-
   }
 
   def awaitTermination(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e9bdb2a..89197e2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,13 +19,12 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
-import java.util.UUID
 import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions, QueryableStateOptions, ResourceManagerOptions, 
TaskManagerOptions}
+import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
@@ -48,14 +47,14 @@ import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
 import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, 
TaskManagerMetricGroup}
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl}
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.util.NetUtils
 
-import scala.concurrent.{Await, ExecutionContext}
 import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{Await, ExecutionContext}
 
 /**
  * Local Flink mini cluster which executes all [[TaskManager]]s and the 
[[JobManager]] in the same
@@ -85,12 +84,6 @@ class LocalFlinkMiniCluster(
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
-  override def startInternalShutdown() {
-    metricRegistry.shutdown()
-
-    super.startInternalShutdown()
-  }
-
   // --------------------------------------------------------------------------
 
   override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
@@ -158,7 +151,7 @@ class LocalFlinkMiniCluster(
       futureExecutor,
       ioExecutor,
       highAvailabilityServices.createBlobStore(),
-      metricRegistry)
+      metricRegistryOpt.get)
 
     val archive = system.actorOf(
       getArchiveProps(
@@ -253,8 +246,12 @@ class LocalFlinkMiniCluster(
 
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
-      resourceID,
-      metricRegistry)
+      resourceID)
+
+    val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+      metricRegistryOpt.get,
+      taskManagerServices.getTaskManagerLocation(),
+      taskManagerServices.getNetworkEnvironment())
 
     val props = getTaskManagerProps(
       taskManagerClass,
@@ -264,11 +261,7 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      taskManagerServices.getTaskManagerMetricGroup)
-
-    if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
-      metricRegistry.startQueryService(system, resourceID)
-    }
+      taskManagerMetricGroup)
 
     system.actorOf(props, taskManagerActorName)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index f209dac..f948df4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
 import org.apache.flink.runtime.messages.{Acknowledge, 
StackTraceSampleResponse}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.util.MetricUtils
 import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
@@ -1888,7 +1889,12 @@ object TaskManager {
     }
 
     // shut down the metric query service
-    metricRegistry.shutdown()
+    try {
+      metricRegistry.shutdown()
+    } catch {
+      case t: Throwable =>
+        LOG.error("Could not properly shut down the metric registry.", t)
+    }
   }
 
   /**
@@ -1996,8 +2002,12 @@ object TaskManager {
 
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
-      resourceID,
-      metricRegistry)
+      resourceID)
+
+    val taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
+      metricRegistry,
+      taskManagerServices.getTaskManagerLocation(),
+      taskManagerServices.getNetworkEnvironment())
 
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = getTaskManagerProps(
@@ -2009,7 +2019,7 @@ object TaskManager {
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
       highAvailabilityServices,
-      taskManagerServices.getTaskManagerMetricGroup)
+      taskManagerMetricGroup)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 8558145..a511d45 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -209,7 +210,7 @@ public class DispatcherTest extends TestLogger {
                                ResourceManagerGateway resourceManagerGateway,
                                BlobServer blobServer,
                                HeartbeatServices heartbeatServices,
-                               MetricRegistryImpl metricRegistry,
+                               MetricRegistry metricRegistry,
                                FatalErrorHandler fatalErrorHandler,
                                JobManagerRunner jobManagerRunner,
                                JobID expectedJobId) throws Exception {
@@ -238,7 +239,7 @@ public class DispatcherTest extends TestLogger {
                                HighAvailabilityServices 
highAvailabilityServices,
                                HeartbeatServices heartbeatServices,
                                JobManagerServices jobManagerServices,
-                               MetricRegistryImpl metricRegistry,
+                               MetricRegistry metricRegistry,
                                OnCompletionActions onCompleteActions,
                                FatalErrorHandler fatalErrorHandler) throws 
Exception {
                        assertEquals(expectedJobId, jobGraph.getJobID());

http://git-wip-us.apache.org/repos/asf/flink/blob/ad42ee27/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index d843da2..88141d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -53,8 +53,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
@@ -62,7 +62,6 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -96,7 +95,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -208,7 +206,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                mySubmittedJobGraphStore,
                                checkpointStateFactory,
                                jobRecoveryTimeout,
-                               Option.<MetricRegistryImpl>empty(),
+                               new JobManagerMetricGroup(new 
NoOpMetricRegistry(), "localhost"),
                                Option.<String>empty());
 
                        jobManager = system.actorOf(jobManagerProps);
@@ -383,7 +381,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
                                submittedJobGraphStore,
                                mock(CheckpointRecoveryFactory.class),
                                jobRecoveryTimeout,
-                               Option.<MetricRegistryImpl>apply(null),
+                               new JobManagerMetricGroup(new 
NoOpMetricRegistry(), "localhost"),
                                
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
 
                        jobManager = system.actorOf(jobManagerProps);

Reply via email to