http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
new file mode 100644
index 0000000..407fa8b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It 
serves as the
+ * connection between {@link MetricGroup MetricGroups} and {@link 
MetricReporter MetricReporters}.
+ */
+public class MetricRegistryImpl implements MetricRegistry {
+       static final Logger LOG = 
LoggerFactory.getLogger(MetricRegistryImpl.class);
+
+       private final Object lock = new Object();
+
+       private List<MetricReporter> reporters;
+       private ScheduledExecutorService executor;
+
+       @Nullable
+       private ActorRef queryService;
+
+       @Nullable
+       private String metricQueryServicePath;
+
+       private ViewUpdater viewUpdater;
+
+       private final ScopeFormats scopeFormats;
+       private final char globalDelimiter;
+       private final List<Character> delimiters = new ArrayList<>();
+
+       /**
+        * Creates a new MetricRegistry and starts the configured reporter.
+        */
+       public MetricRegistryImpl(MetricRegistryConfiguration config) {
+               this.scopeFormats = config.getScopeFormats();
+               this.globalDelimiter = config.getDelimiter();
+
+               // second, instantiate any custom configured reporters
+               this.reporters = new ArrayList<>();
+
+               List<Tuple2<String, Configuration>> reporterConfigurations = 
config.getReporterConfigurations();
+
+               this.executor = Executors.newSingleThreadScheduledExecutor(new 
ExecutorThreadFactory("Flink-MetricRegistry"));
+
+               this.queryService = null;
+               this.metricQueryServicePath = null;
+
+               if (reporterConfigurations.isEmpty()) {
+                       // no reporters defined
+                       // by default, don't report anything
+                       LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
+               } else {
+                       // we have some reporters so
+                       for (Tuple2<String, Configuration> 
reporterConfiguration: reporterConfigurations) {
+                               String namedReporter = reporterConfiguration.f0;
+                               Configuration reporterConfig = 
reporterConfiguration.f1;
+
+                               final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
+                               if (className == null) {
+                                       LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
+                                       continue;
+                               }
+
+                               try {
+                                       String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
+                                       TimeUnit timeunit = TimeUnit.SECONDS;
+                                       long period = 10;
+
+                                       if (configuredPeriod != null) {
+                                               try {
+                                                       String[] interval = 
configuredPeriod.split(" ");
+                                                       period = 
Long.parseLong(interval[0]);
+                                                       timeunit = 
TimeUnit.valueOf(interval[1]);
+                                               }
+                                               catch (Exception e) {
+                                                       LOG.error("Cannot parse 
report interval from config: " + configuredPeriod +
+                                                                       " - 
please use values like '10 SECONDS' or '500 MILLISECONDS'. " +
+                                                                       "Using 
default reporting interval.");
+                                               }
+                                       }
+
+                                       Class<?> reporterClass = 
Class.forName(className);
+                                       MetricReporter reporterInstance = 
(MetricReporter) reporterClass.newInstance();
+
+                                       MetricConfig metricConfig = new 
MetricConfig();
+                                       
reporterConfig.addAllToProperties(metricConfig);
+                                       LOG.info("Configuring {} with {}.", 
reporterClass.getSimpleName(), metricConfig);
+                                       reporterInstance.open(metricConfig);
+
+                                       if (reporterInstance instanceof 
Scheduled) {
+                                               LOG.info("Periodically 
reporting metrics in intervals of {} {} for reporter {} of type {}.", period, 
timeunit.name(), namedReporter, className);
+
+                                               executor.scheduleWithFixedDelay(
+                                                               new 
MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, 
timeunit);
+                                       } else {
+                                               LOG.info("Reporting metrics for 
reporter {} of type {}.", namedReporter, className);
+                                       }
+                                       reporters.add(reporterInstance);
+
+                                       String delimiterForReporter = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, 
String.valueOf(globalDelimiter));
+                                       if (delimiterForReporter.length() != 1) 
{
+                                               LOG.warn("Failed to parse 
delimiter '{}' for reporter '{}', using global delimiter '{}'.", 
delimiterForReporter, namedReporter, globalDelimiter);
+                                               delimiterForReporter = 
String.valueOf(globalDelimiter);
+                                       }
+                                       
this.delimiters.add(delimiterForReporter.charAt(0));
+                               }
+                               catch (Throwable t) {
+                                       LOG.error("Could not instantiate 
metrics reporter {}. Metrics might not be exposed/reported.", namedReporter, t);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Initializes the MetricQueryService.
+        *
+        * @param actorSystem ActorSystem to create the MetricQueryService on
+        * @param resourceID resource ID used to disambiguate the actor name
+     */
+       public void startQueryService(ActorSystem actorSystem, ResourceID 
resourceID) {
+               synchronized (lock) {
+                       Preconditions.checkState(!isShutdown(), "The metric 
registry has already been shut down.");
+
+                       try {
+                               queryService = 
MetricQueryService.startMetricQueryService(actorSystem, resourceID);
+                               metricQueryServicePath = 
AkkaUtils.getAkkaURL(actorSystem, queryService);
+                       } catch (Exception e) {
+                               LOG.warn("Could not start MetricDumpActor. No 
metrics will be submitted to the WebInterface.", e);
+                       }
+               }
+       }
+
+       /**
+        * Returns the address under which the {@link MetricQueryService} is 
reachable.
+        *
+        * @return address of the metric query service
+        */
+       @Nullable
+       public String getMetricQueryServicePath() {
+               return metricQueryServicePath;
+       }
+
+       @Override
+       public char getDelimiter() {
+               return this.globalDelimiter;
+       }
+
+       @Override
+       public char getDelimiter(int reporterIndex) {
+               try {
+                       return delimiters.get(reporterIndex);
+               } catch (IndexOutOfBoundsException e) {
+                       LOG.warn("Delimiter for reporter index {} not found, 
returning global delimiter.", reporterIndex);
+                       return this.globalDelimiter;
+               }
+       }
+
+       @Override
+       public int getNumberReporters() {
+               return reporters.size();
+       }
+
+       public List<MetricReporter> getReporters() {
+               return reporters;
+       }
+
+       /**
+        * Returns whether this registry has been shutdown.
+        *
+        * @return true, if this registry was shutdown, otherwise false
+        */
+       public boolean isShutdown() {
+               synchronized (lock) {
+                       return reporters == null && executor.isShutdown();
+               }
+       }
+
+       /**
+        * Shuts down this registry and the associated {@link MetricReporter}.
+        */
+       public void shutdown() {
+               synchronized (lock) {
+                       Future<Boolean> stopFuture = null;
+                       FiniteDuration stopTimeout = null;
+
+                       if (queryService != null) {
+                               stopTimeout = new FiniteDuration(1L, 
TimeUnit.SECONDS);
+                               stopFuture = 
Patterns.gracefulStop(queryService, stopTimeout);
+                       }
+
+                       if (reporters != null) {
+                               for (MetricReporter reporter : reporters) {
+                                       try {
+                                               reporter.close();
+                                       } catch (Throwable t) {
+                                               LOG.warn("Metrics reporter did 
not shut down cleanly", t);
+                                       }
+                               }
+                               reporters = null;
+                       }
+                       shutdownExecutor();
+
+                       if (stopFuture != null) {
+                               boolean stopped = false;
+
+                               try {
+                                       stopped = Await.result(stopFuture, 
stopTimeout);
+                               } catch (Exception e) {
+                                       LOG.warn("Query actor did not properly 
stop.", e);
+                               }
+
+                               if (!stopped) {
+                                       // the query actor did not stop in 
time, let's kill him
+                                       queryService.tell(Kill.getInstance(), 
ActorRef.noSender());
+                               }
+                       }
+               }
+       }
+
+       private void shutdownExecutor() {
+               if (executor != null) {
+                       executor.shutdown();
+
+                       try {
+                               if (!executor.awaitTermination(1L, 
TimeUnit.SECONDS)) {
+                                       executor.shutdownNow();
+                               }
+                       } catch (InterruptedException e) {
+                               executor.shutdownNow();
+                       }
+               }
+       }
+
+       @Override
+       public ScopeFormats getScopeFormats() {
+               return scopeFormats;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Metrics (de)registration
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               synchronized (lock) {
+                       if (isShutdown()) {
+                               LOG.warn("Cannot register metric, because the 
MetricRegistry has already been shut down.");
+                       } else {
+                               if (reporters != null) {
+                                       for (int i = 0; i < reporters.size(); 
i++) {
+                                               MetricReporter reporter = 
reporters.get(i);
+                                               try {
+                                                       if (reporter != null) {
+                                                               
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+                                                               
reporter.notifyOfAddedMetric(metric, metricName, front);
+                                                       }
+                                               } catch (Exception e) {
+                                                       LOG.warn("Error while 
registering metric.", e);
+                                               }
+                                       }
+                               }
+                               try {
+                                       if (queryService != null) {
+                                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while registering 
metric.", e);
+                               }
+                               try {
+                                       if (metric instanceof View) {
+                                               if (viewUpdater == null) {
+                                                       viewUpdater = new 
ViewUpdater(executor);
+                                               }
+                                               
viewUpdater.notifyOfAddedView((View) metric);
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while registering 
metric.", e);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+               synchronized (lock) {
+                       if (isShutdown()) {
+                               LOG.warn("Cannot unregister metric, because the 
MetricRegistry has already been shut down.");
+                       } else {
+                               if (reporters != null) {
+                                       for (int i = 0; i < reporters.size(); 
i++) {
+                                               try {
+                                               MetricReporter reporter = 
reporters.get(i);
+                                                       if (reporter != null) {
+                                                               
FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+                                                               
reporter.notifyOfRemovedMetric(metric, metricName, front);
+                                                       }
+                                               } catch (Exception e) {
+                                                       LOG.warn("Error while 
registering metric.", e);
+                                               }
+                                       }
+                               }
+                               try {
+                                       if (queryService != null) {
+                                               
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while registering 
metric.", e);
+                               }
+                               try {
+                                       if (metric instanceof View) {
+                                               if (viewUpdater != null) {
+                                                       
viewUpdater.notifyOfRemovedView((View) metric);
+                                               }
+                                       }
+                               } catch (Exception e) {
+                                       LOG.warn("Error while registering 
metric.", e);
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       @Nullable
+       public ActorRef getQueryService() {
+               return queryService;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This task is explicitly a static class, so that it does not hold any 
references to the enclosing
+        * MetricsRegistry instance.
+        *
+        * <p>This is a subtle difference, but very important: With this static 
class, the enclosing class instance
+        * may become garbage-collectible, whereas with an anonymous inner 
class, the timer thread
+        * (which is a GC root) will hold a reference via the timer task and 
its enclosing instance pointer.
+        * Making the MetricsRegistry garbage collectible makes the 
java.util.Timer garbage collectible,
+        * which acts as a fail-safe to stop the timer thread and prevents 
resource leaks.
+        */
+       private static final class ReporterTask extends TimerTask {
+
+               private final Scheduled reporter;
+
+               private ReporterTask(Scheduled reporter) {
+                       this.reporter = reporter;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               reporter.report();
+                       } catch (Throwable t) {
+                               LOG.warn("Error while reporting metrics", t);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index ab59977..66eace5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -106,7 +106,7 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                this.registry = checkNotNull(registry);
                this.scopeComponents = checkNotNull(scope);
                this.parent = parent;
-               this.scopeStrings = new String[registry.getReporters() == null 
? 0 : registry.getReporters().size()];
+               this.scopeStrings = new String[registry.getNumberReporters()];
        }
 
        public Map<String, String> getAllVariables() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index dd352bb..d4248ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -68,7 +68,7 @@ public class MiniCluster {
        private final MiniClusterConfiguration miniClusterConfiguration;
 
        @GuardedBy("lock") 
-       private MetricRegistry metricRegistry;
+       private MetricRegistryImpl metricRegistry;
 
        @GuardedBy("lock")
        private RpcService commonRpcService;
@@ -464,8 +464,8 @@ public class MiniCluster {
         * 
         * @param config The configuration of the mini cluster
         */
-       protected MetricRegistry createMetricRegistry(Configuration config) {
-               return new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+       protected MetricRegistryImpl createMetricRegistry(Configuration config) 
{
+               return new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
        }
 
        /**
@@ -502,7 +502,7 @@ public class MiniCluster {
                        Configuration configuration,
                        HighAvailabilityServices haServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        int numResourceManagers,
                        RpcService[] resourceManagerRpcServices) throws 
Exception {
 
@@ -528,7 +528,7 @@ public class MiniCluster {
        protected TaskExecutor[] startTaskManagers(
                        Configuration configuration,
                        HighAvailabilityServices haServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        int numTaskManagers,
                        RpcService[] taskManagerRpcServices) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 60d9a66..ca042b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.ExceptionUtils;
@@ -75,7 +75,7 @@ public class MiniClusterJobDispatcher {
        private final JobManagerServices jobManagerServices;
 
        /** Registry for all metrics in the mini cluster */
-       private final MetricRegistry metricRegistry;
+       private final MetricRegistryImpl metricRegistry;
 
        /** The number of JobManagers to launch (more than one simulates a 
high-availability setup) */
        private final int numJobManagers;
@@ -104,7 +104,7 @@ public class MiniClusterJobDispatcher {
                        HighAvailabilityServices haServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry) throws Exception {
+                       MetricRegistryImpl metricRegistry) throws Exception {
                this(
                        config,
                        haServices,
@@ -132,7 +132,7 @@ public class MiniClusterJobDispatcher {
                        HighAvailabilityServices haServices,
                        BlobServer blobServer,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        int numJobManagers,
                        RpcService[] rpcServices) throws Exception {
                

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
index cbefe5a..90fb115 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/StandaloneMiniCluster.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.runtime.minicluster;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,18 +28,25 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
 
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import scala.Option;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 /**
  * Mini cluster to run the old JobManager code without embedded high 
availability services. This
  * class has been implemented because the normal {@link FlinkMiniCluster} has 
been changed to use
@@ -63,6 +67,8 @@ public class StandaloneMiniCluster {
 
        private final HighAvailabilityServices highAvailabilityServices;
 
+       private final MetricRegistryImpl metricRegistry;
+
        private final FiniteDuration timeout;
 
        private final int port;
@@ -86,21 +92,28 @@ public class StandaloneMiniCluster {
                        Executors.directExecutor(),
                        
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
 
+               metricRegistry = new MetricRegistryImpl(
+                       
MetricRegistryConfiguration.fromConfiguration(configuration));
+
                JobManager.startJobManagerActors(
                        configuration,
                        actorSystem,
                        scheduledExecutorService,
                        scheduledExecutorService,
                        highAvailabilityServices,
+                       metricRegistry,
                        Option.empty(),
                        JobManager.class,
                        MemoryArchivist.class);
 
+               final ResourceID taskManagerResourceId = ResourceID.generate();
+
                ActorRef taskManager = 
TaskManager.startTaskManagerComponentsAndActor(
                        configuration,
-                       ResourceID.generate(),
+                       taskManagerResourceId,
                        actorSystem,
                        highAvailabilityServices,
+                       metricRegistry,
                        LOCAL_HOSTNAME,
                        Option.<String>empty(),
                        true,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 1d4d4f3..98b80c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -42,7 +42,7 @@ import 
org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
@@ -118,7 +118,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
 
        /** Registry to use for metrics. */
-       private final MetricRegistry metricRegistry;
+       private final MetricRegistryImpl metricRegistry;
 
        /** Fatal error handler. */
        private final FatalErrorHandler fatalErrorHandler;
@@ -140,7 +140,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
 
@@ -498,8 +498,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        }
 
        @Override
-       public CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
-               final ArrayList<Tuple2<InstanceID, String>> 
metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
+       public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+               final ArrayList<Tuple2<ResourceID, String>> 
metricQueryServicePaths = new ArrayList<>(taskExecutors.size());
 
                for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> 
workerRegistrationEntry : taskExecutors.entrySet()) {
                        final ResourceID tmResourceId = 
workerRegistrationEntry.getKey();
@@ -508,7 +508,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        final String tmMetricQueryServicePath = 
taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
                                MetricQueryService.METRIC_QUERY_SERVICE_NAME + 
'_' + tmResourceId.getResourceIdString();
 
-                       
metricQueryServicePaths.add(Tuple2.of(workerRegistration.getInstanceID(), 
tmMetricQueryServicePath));
+                       metricQueryServicePaths.add(Tuple2.of(tmResourceId, 
tmMetricQueryServicePath));
                }
 
                return 
CompletableFuture.completedFuture(metricQueryServicePaths);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 9eacb4b..cc2766b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -176,5 +176,5 @@ public interface ResourceManagerGateway extends 
FencedRpcGateway<ResourceManager
         * @param timeout for the asynchronous operation
         * @return Future containing the collection of instance ids and the 
corresponding metric query service path
         */
-       CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+       CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
index caa3ba0..361bdd4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerRunner.java
@@ -22,7 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.FlinkException;
@@ -55,7 +55,7 @@ public class ResourceManagerRunner implements 
FatalErrorHandler {
                        final RpcService rpcService,
                        final HighAvailabilityServices highAvailabilityServices,
                        final HeartbeatServices heartbeatServices,
-                       final MetricRegistry metricRegistry) throws Exception {
+                       final MetricRegistryImpl metricRegistry) throws 
Exception {
 
                Preconditions.checkNotNull(resourceId);
                Preconditions.checkNotNull(configuration);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 624f31d..d2b1205 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
@@ -45,7 +45,7 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceID> {
                        HighAvailabilityServices highAvailabilityServices,
                        HeartbeatServices heartbeatServices,
                        SlotManager slotManager,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
                super(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index cf5bfcb..4d6ccd5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -31,8 +31,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
 import org.apache.flink.runtime.rest.handler.WebHandler;
@@ -166,13 +166,13 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                //fetch TaskManager logs if no other process is currently doing 
it
                if (lastRequestPending.putIfAbsent(taskManagerID, true) == 
null) {
                        try {
-                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(taskManagerID));
-                               CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
+                               ResourceID resourceId = new ResourceID(new 
String(StringUtils.hexStringToByte(taskManagerID)));
+                               CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, 
timeout);
 
                                CompletableFuture<TransientBlobKey> 
blobKeyFuture = taskManagerFuture.thenCompose(
                                        (Optional<Instance> optTMInstance) -> {
                                                Instance taskManagerInstance = 
optTMInstance.orElseThrow(
-                                                       () -> new 
CompletionException(new FlinkException("Could not find instance with " + 
instanceID + '.')));
+                                                       () -> new 
CompletionException(new FlinkException("Could not find instance with " + 
resourceId + '.')));
                                                switch (fileMode) {
                                                        case LOG:
                                                                return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index ad2ee1b..84c6e41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -19,19 +19,20 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.StringUtils;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -74,8 +75,16 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                        // return them in an array. This avoids unnecessary 
code complexity.
                        // If only one task manager is requested, we only fetch 
one task manager metrics.
                        if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-                               CompletableFuture<Optional<Instance>> 
tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
+                               final String unescapedString;
+
+                               try {
+                                       unescapedString = 
URLDecoder.decode(pathParams.get(TASK_MANAGER_ID_KEY), "UTF-8");
+                               } catch (UnsupportedEncodingException e) {
+                                       return 
FutureUtils.completedExceptionally(new FlinkException("Could not decode task 
manager id: " + pathParams.get(TASK_MANAGER_ID_KEY) + '.', e));
+                               }
+
+                               ResourceID resourceId = new 
ResourceID(unescapedString);
+                               CompletableFuture<Optional<Instance>> 
tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(resourceId, 
timeout);
 
                                return tmInstanceFuture.thenApplyAsync(
                                        (Optional<Instance> optTaskManager) -> {
@@ -116,7 +125,7 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
 
                for (Instance instance : instances) {
                        gen.writeStartObject();
-                       gen.writeStringField("id", instance.getId().toString());
+                       gen.writeStringField("id", 
instance.getTaskManagerID().getResourceIdString());
                        gen.writeStringField("path", 
instance.getTaskManagerGateway().getAddress());
                        gen.writeNumberField("dataPort", 
instance.getTaskManagerLocation().dataPort());
                        gen.writeNumberField("timeSinceLastHeartbeat", 
instance.getLastHeartBeat());
@@ -131,7 +140,7 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                        if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
                                fetcher.update();
 
-                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getTaskManagerID().getResourceIdString());
                                if (metrics != null) {
                                        gen.writeObjectFieldStart("metrics");
                                        long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
index 1bfb9f2..e71a1d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
@@ -145,20 +145,20 @@ public class MetricFetcher<T extends RestfulGateway> {
                                // TODO: Once the old code has been ditched, 
remove the explicit TaskManager query service discovery
                                // TODO: and return it as part of 
requestQueryServicePaths. Moreover, change the MetricStore such that
                                // TODO: we don't have to explicitly retain the 
valid TaskManagers, e.g. letting it be a cache with expiry time
-                               CompletableFuture<Collection<Tuple2<InstanceID, 
String>>> taskManagerQueryServicePathsFuture = leaderGateway
+                               CompletableFuture<Collection<Tuple2<ResourceID, 
String>>> taskManagerQueryServicePathsFuture = leaderGateway
                                        
.requestTaskManagerMetricQueryServicePaths(timeout);
 
                                
taskManagerQueryServicePathsFuture.whenCompleteAsync(
-                                       (Collection<Tuple2<InstanceID, String>> 
queryServicePaths, Throwable throwable) -> {
+                                       (Collection<Tuple2<ResourceID, String>> 
queryServicePaths, Throwable throwable) -> {
                                                if (throwable != null) {
                                                        LOG.warn("Requesting 
TaskManager's path for query services failed.", throwable);
                                                } else {
                                                        List<String> 
taskManagersToRetain = queryServicePaths
                                                                .stream()
                                                                .map(
-                                                                       
(Tuple2<InstanceID, String> tuple) -> {
+                                                                       
(Tuple2<ResourceID, String> tuple) -> {
                                                                                
retrieveAndQueryMetrics(tuple.f1);
-                                                                               
return tuple.f0.toString();
+                                                                               
return tuple.f0.getResourceIdString();
                                                                        }
                                                                
).collect(Collectors.toList());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index cd67705..a956111 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
@@ -135,7 +135,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        private final NetworkEnvironment networkEnvironment;
 
        /** The metric registry in the task manager */
-       private final MetricRegistry metricRegistry;
+       private final MetricRegistryImpl metricRegistry;
 
        /** The heartbeat manager for job manager in the task manager */
        private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
@@ -179,7 +179,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        NetworkEnvironment networkEnvironment,
                        HighAvailabilityServices haServices,
                        HeartbeatServices heartbeatServices,
-                       MetricRegistry metricRegistry,
+                       MetricRegistryImpl metricRegistry,
                        TaskManagerMetricGroup taskManagerMetricGroup,
                        BroadcastVariableManager broadcastVariableManager,
                        FileCache fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 782ab07..5a69bb1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -29,22 +29,19 @@ import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
-import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -86,7 +83,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
 
        private final HighAvailabilityServices highAvailabilityServices;
 
-       private final MetricRegistry metricRegistry;
+       private final MetricRegistryImpl metricRegistry;
 
        /** Executor used to run future callbacks */
        private final ExecutorService executor;
@@ -112,7 +109,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
 
                HeartbeatServices heartbeatServices = 
HeartbeatServices.fromConfiguration(configuration);
 
-               metricRegistry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration));
+               metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
 
                final ActorSystem actorSystem = ((AkkaRpcService) 
rpcService).getActorSystem();
                metricRegistry.startQueryService(actorSystem, resourceId);
@@ -250,7 +247,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                HeartbeatServices heartbeatServices,
-               MetricRegistry metricRegistry,
+               MetricRegistryImpl metricRegistry,
                boolean localCommunicationOnly,
                FatalErrorHandler fatalErrorHandler) throws Exception {
 
@@ -269,18 +266,11 @@ public class TaskManagerRunner implements 
FatalErrorHandler {
 
                TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
                        taskManagerServicesConfiguration,
-                       resourceID);
+                       resourceID,
+                       metricRegistry);
 
                TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
 
-               TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
-                       metricRegistry,
-                       
taskManagerServices.getTaskManagerLocation().getHostname(),
-                       resourceID.toString());
-
-               // Initialize the TM metrics
-               
TaskExecutorMetricsInitializer.instantiateStatusMetrics(taskManagerMetricGroup, 
taskManagerServices.getNetworkEnvironment());
-
                return new TaskExecutor(
                        rpcService,
                        taskManagerConfiguration,
@@ -291,7 +281,7 @@ public class TaskManagerRunner implements FatalErrorHandler 
{
                        highAvailabilityServices,
                        heartbeatServices,
                        metricRegistry,
-                       taskManagerMetricGroup,
+                       taskManagerServices.getTaskManagerMetricGroup(),
                        taskManagerServices.getBroadcastVariableManager(),
                        taskManagerServices.getFileCache(),
                        taskManagerServices.getTaskSlotTable(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 2baf644..85e62c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -62,7 +63,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link 
MemoryManager}, {@link IOManager},
- * {@link NetworkEnvironment} and the {@link MetricRegistry}.
+ * {@link NetworkEnvironment} and the {@link MetricRegistryImpl}.
  */
 public class TaskManagerServices {
        private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerServices.class);
@@ -72,7 +73,6 @@ public class TaskManagerServices {
        private final MemoryManager memoryManager;
        private final IOManager ioManager;
        private final NetworkEnvironment networkEnvironment;
-       private final MetricRegistry metricRegistry;
        private final TaskManagerMetricGroup taskManagerMetricGroup;
        private final BroadcastVariableManager broadcastVariableManager;
        private final FileCache fileCache;
@@ -85,7 +85,6 @@ public class TaskManagerServices {
                MemoryManager memoryManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,
-               MetricRegistry metricRegistry,
                TaskManagerMetricGroup taskManagerMetricGroup,
                BroadcastVariableManager broadcastVariableManager,
                FileCache fileCache,
@@ -97,7 +96,6 @@ public class TaskManagerServices {
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
                this.ioManager = Preconditions.checkNotNull(ioManager);
                this.networkEnvironment = 
Preconditions.checkNotNull(networkEnvironment);
-               this.metricRegistry = 
Preconditions.checkNotNull(metricRegistry);
                this.taskManagerMetricGroup = 
Preconditions.checkNotNull(taskManagerMetricGroup);
                this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
                this.fileCache = Preconditions.checkNotNull(fileCache);
@@ -126,10 +124,6 @@ public class TaskManagerServices {
                return taskManagerLocation;
        }
 
-       public MetricRegistry getMetricRegistry() {
-               return metricRegistry;
-       }
-
        public TaskManagerMetricGroup getTaskManagerMetricGroup() {
                return taskManagerMetricGroup;
        }
@@ -163,12 +157,14 @@ public class TaskManagerServices {
         *
         * @param resourceID resource ID of the task manager
         * @param taskManagerServicesConfiguration task manager configuration
+        * @param metricRegistry to register the TaskManagerMetricGroup
         * @return task manager components
         * @throws Exception
         */
        public static TaskManagerServices fromConfiguration(
                        TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
-                       ResourceID resourceID) throws Exception {
+                       ResourceID resourceID,
+                       MetricRegistry metricRegistry) throws Exception {
 
                // pre-start checks
                
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
@@ -187,9 +183,6 @@ public class TaskManagerServices {
                // start the I/O manager, it will create some temp directories.
                final IOManager ioManager = new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-               final MetricRegistry metricRegistry = new MetricRegistry(
-                               
taskManagerServicesConfiguration.getMetricRegistryConfiguration());
-
                final TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(
                        metricRegistry,
                        taskManagerLocation.getHostname(),
@@ -223,7 +216,6 @@ public class TaskManagerServices {
                        memoryManager,
                        ioManager,
                        network,
-                       metricRegistry,
                        taskManagerMetricGroup,
                        broadcastVariableManager,
                        fileCache,

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index bfd37bc..990fb22 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.NetUtils;
@@ -72,8 +71,6 @@ public class TaskManagerServicesConfiguration {
 
        private final float memoryFraction;
 
-       private final MetricRegistryConfiguration metricRegistryConfiguration;
-
        private final long timerServiceShutdownTimeout;
 
        public TaskManagerServicesConfiguration(
@@ -85,7 +82,6 @@ public class TaskManagerServicesConfiguration {
                        long configuredMemory,
                        boolean preAllocateMemory,
                        float memoryFraction,
-                       MetricRegistryConfiguration metricRegistryConfiguration,
                        long timerServiceShutdownTimeout) {
 
                this.taskManagerAddress = checkNotNull(taskManagerAddress);
@@ -98,8 +94,6 @@ public class TaskManagerServicesConfiguration {
                this.preAllocateMemory = preAllocateMemory;
                this.memoryFraction = memoryFraction;
 
-               this.metricRegistryConfiguration = 
checkNotNull(metricRegistryConfiguration);
-
                checkArgument(timerServiceShutdownTimeout >= 0L, "The timer " +
                        "service shutdown timeout must be greater or equal to 
0.");
                this.timerServiceShutdownTimeout = timerServiceShutdownTimeout;
@@ -148,10 +142,6 @@ public class TaskManagerServicesConfiguration {
                return preAllocateMemory;
        }
 
-       public MetricRegistryConfiguration getMetricRegistryConfiguration() {
-               return metricRegistryConfiguration;
-       }
-
        public long getTimerServiceShutdownTimeout() {
                return timerServiceShutdownTimeout;
        }
@@ -211,8 +201,6 @@ public class TaskManagerServicesConfiguration {
                        TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
                        "MemoryManager fraction of the free memory must be 
between 0.0 and 1.0");
 
-               final MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(configuration);
-
                long timerServiceShutdownTimeout = 
AkkaUtils.getTimeout(configuration).toMillis();
 
                return new TaskManagerServicesConfiguration(
@@ -224,7 +212,6 @@ public class TaskManagerServicesConfiguration {
                        configuredMemory,
                        preAllocateMemory,
                        memoryFraction,
-                       metricRegistryConfiguration,
                        timerServiceShutdownTimeout);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index d871b06..331e96b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -95,5 +95,5 @@ public interface RestfulGateway extends RpcGateway {
         * @param timeout for the asynchronous operation
         * @return Future containing the collection of instance ids and the 
corresponding metric query service path
         */
-       CompletableFuture<Collection<Tuple2<InstanceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+       CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 74ef1de..1c573c0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -35,7 +35,8 @@ import org.apache.flink.runtime.jobmanager.{JobManager, 
SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
 import org.apache.flink.runtime.messages.Acknowledge
 import org.apache.flink.runtime.messages.JobManagerMessages.{CurrentJobStatus, 
JobNotFound, RequestJobStatus}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistryImpl => 
FlinkMetricRegistry}
 
 import scala.concurrent.duration._
 import scala.language.postfixOps
@@ -74,7 +75,7 @@ abstract class ContaineredJobManager(
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String])
   extends JobManager(
     flinkConfiguration,
@@ -91,7 +92,7 @@ abstract class ContaineredJobManager(
     submittedJobGraphs,
     checkpointRecoveryFactory,
     jobRecoveryTimeout,
-    metricsRegistry,
+    jobManagerMetricGroup,
     optRestAddress) {
 
   val jobPollingInterval: FiniteDuration

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0435046..d40a0fd 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -34,7 +34,6 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration._
 import org.apache.flink.core.fs.{FileSystem, Path}
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
 import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
@@ -50,10 +49,10 @@ import org.apache.flink.runtime.concurrent.{FutureUtils, 
ScheduledExecutorServic
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
LibraryCacheManager}
-import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
-import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
+import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
+import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, 
HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, 
InstanceManager}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
@@ -66,20 +65,18 @@ import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
+import org.apache.flink.runtime.messages.{Acknowledge, StackTrace}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.util.MetricUtils
-import org.apache.flink.runtime.net.SSLUtils
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, 
NotifyKvStateRegistered, NotifyKvStateUnregistered}
 import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -137,7 +134,7 @@ class JobManager(
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
     protected val jobRecoveryTimeout: FiniteDuration,
-    protected val metricsRegistry: Option[FlinkMetricRegistry],
+    protected val jobManagerMetricGroup: JobManagerMetricGroup,
     protected val optRestAddress: Option[String])
   extends FlinkActor
   with LeaderSessionMessageFilter // mixin oder is important, we want 
filtering after logging
@@ -154,16 +151,6 @@ class JobManager(
 
   var leaderSessionID: Option[UUID] = None
 
-  protected val jobManagerMetricGroup : Option[JobManagerMetricGroup] = 
metricsRegistry match {
-    case Some(registry) =>
-      val host = flinkConfiguration.getString(JobManagerOptions.ADDRESS)
-      Option(new JobManagerMetricGroup(
-        registry, NetUtils.unresolvedHostToNormalizedString(host)))
-    case None =>
-      log.warn("Could not instantiate JobManager metrics.")
-      None
-  }
-
   /** Futures which have to be completed before terminating the job manager */
   var futuresToComplete: Option[Seq[Future[Unit]]] = None
 
@@ -205,12 +192,7 @@ class JobManager(
         throw new RuntimeException("Could not start the submitted job graphs 
service.", e)
     }
 
-    jobManagerMetricGroup match {
-      case Some(group) =>
-        instantiateMetrics(group)
-      case None =>
-        log.warn("Could not instantiate JobManager metric group.")
-    }
+    instantiateMetrics(jobManagerMetricGroup)
   }
 
   override def postStop(): Unit = {
@@ -250,6 +232,8 @@ class JobManager(
       archive ! decorateMessage(PoisonPill)
     }
 
+    jobManagerMetricGroup.close()
+
     instanceManager.shutdown()
     scheduler.shutdown()
     libraryCacheManager.shutdown()
@@ -260,13 +244,6 @@ class JobManager(
       case e: IOException => log.error("Could not properly shutdown the blob 
server.", e)
     }
 
-    // failsafe shutdown of the metrics registry
-    try {
-      metricsRegistry.foreach(_.shutdown())
-    } catch {
-      case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
-    }
-
     log.debug(s"Job manager ${self.path} is completely stopped.")
   }
 
@@ -1073,9 +1050,9 @@ class JobManager(
         )
       )
 
-    case RequestTaskManagerInstance(instanceID) =>
+    case RequestTaskManagerInstance(resourceId) =>
       sender ! decorateMessage(
-        
TaskManagerInstance(Option(instanceManager.getRegisteredInstanceById(instanceID)))
+        
TaskManagerInstance(Option(instanceManager.getRegisteredInstance(resourceId)))
       )
 
     case Heartbeat(instanceID, accumulators) =>
@@ -1283,15 +1260,7 @@ class JobManager(
 
         log.info(s"Using restart strategy $restartStrategy for $jobId.")
 
-        val jobMetrics = jobManagerMetricGroup match {
-          case Some(group) =>
-            group.addJob(jobGraph) match {
-              case (jobGroup:Any) => jobGroup
-              case null => new UnregisteredMetricsGroup()
-            }
-          case None =>
-            new UnregisteredMetricsGroup()
-        }
+        val jobMetrics = jobManagerMetricGroup.addJob(jobGraph)
 
         val numSlots = scheduler.getTotalNumberOfSlots()
 
@@ -1791,7 +1760,7 @@ class JobManager(
     libraryCacheManager.unregisterJob(jobID)
     blobServer.cleanupJob(jobID)
 
-    jobManagerMetricGroup.foreach(_.removeJob(jobID))
+    jobManagerMetricGroup.removeJob(jobID)
 
     futureOption
   }
@@ -2042,7 +2011,12 @@ object JobManager {
     val highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
       configuration,
       ioExecutor,
-      AddressResolution.NO_ADDRESS_RESOLUTION);
+      AddressResolution.NO_ADDRESS_RESOLUTION)
+
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
+
+    metricRegistry.startQueryService(jobManagerSystem, null)
 
     val (_, _, webMonitorOption, _) = try {
       startJobManagerActors(
@@ -2053,6 +2027,7 @@ object JobManager {
         futureExecutor,
         ioExecutor,
         highAvailabilityServices,
+        metricRegistry,
         classOf[JobManager],
         classOf[MemoryArchivist],
         Option(classOf[StandaloneResourceManager])
@@ -2085,6 +2060,13 @@ object JobManager {
         LOG.warn("Could not properly stop the high availability services.", t)
     }
 
+    try {
+      metricRegistry.shutdown()
+    } catch {
+      case t: Throwable =>
+        LOG.warn("Could not properly shut down the metric registry.", t)
+    }
+
     FlinkExecutors.gracefulShutdown(
       timeout.toMillis,
       TimeUnit.MILLISECONDS,
@@ -2191,6 +2173,7 @@ object JobManager {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist],
       resourceManagerClass: Option[Class[_ <: FlinkResourceManager[_]]])
@@ -2231,6 +2214,7 @@ object JobManager {
         futureExecutor,
         ioExecutor,
         highAvailabilityServices,
+        metricRegistry,
         webMonitor.map(_.getRestAddress),
         jobManagerClass,
         archiveClass)
@@ -2250,11 +2234,14 @@ object JobManager {
       if (executionMode == JobManagerMode.LOCAL) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL 
execution mode")
 
+        val resourceId = ResourceID.generate()
+
         val taskManagerActor = TaskManager.startTaskManagerComponentsAndActor(
           configuration,
-          ResourceID.generate(),
+          resourceId,
           jobManagerSystem,
           highAvailabilityServices,
+          metricRegistry,
           externalHostname,
           Some(TaskExecutor.TASK_MANAGER_NAME),
           localTaskManagerCommunication = true,
@@ -2433,7 +2420,8 @@ object JobManager {
       configuration: Configuration,
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
-      blobStore: BlobStore) :
+      blobStore: BlobStore,
+      metricRegistry: FlinkMetricRegistry) :
     (InstanceManager,
     FlinkScheduler,
     BlobServer,
@@ -2443,7 +2431,7 @@ object JobManager {
     Int, // number of archived jobs
     Option[Path], // archive path
     FiniteDuration, // timeout for job recovery
-    Option[FlinkMetricRegistry]
+    JobManagerMetricGroup
    ) = {
 
     val timeout: FiniteDuration = AkkaUtils.getTimeout(configuration)
@@ -2525,12 +2513,9 @@ object JobManager {
       }
     }
 
-    val metricRegistry = try {
-      Option(new 
FlinkMetricRegistry(MetricRegistryConfiguration.fromConfiguration(configuration)))
-    } catch {
-      case _: Exception =>
-        None
-    }
+    val jobManagerMetricGroup = new JobManagerMetricGroup(
+      metricRegistry,
+      configuration.getString(JobManagerOptions.ADDRESS))
 
     (instanceManager,
       scheduler,
@@ -2541,7 +2526,7 @@ object JobManager {
       archiveCount,
       archivePath,
       jobRecoveryTimeout,
-      metricRegistry)
+      jobManagerMetricGroup)
   }
 
   /**
@@ -2564,6 +2549,7 @@ object JobManager {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       optRestAddress: Option[String],
       jobManagerClass: Class[_ <: JobManager],
       archiveClass: Class[_ <: MemoryArchivist])
@@ -2575,6 +2561,7 @@ object JobManager {
       futureExecutor,
       ioExecutor,
       highAvailabilityServices,
+      metricRegistry,
       optRestAddress,
       Some(JobMaster.JOB_MANAGER_NAME),
       Some(JobMaster.ARCHIVE_NAME),
@@ -2606,6 +2593,7 @@ object JobManager {
       futureExecutor: ScheduledExecutorService,
       ioExecutor: Executor,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       optRestAddress: Option[String],
       jobManagerActorName: Option[String],
       archiveActorName: Option[String],
@@ -2622,11 +2610,12 @@ object JobManager {
     archiveCount,
     archivePath,
     jobRecoveryTimeout,
-    metricsRegistry) = createJobManagerComponents(
+    jobManagerMetricGroup) = createJobManagerComponents(
       configuration,
       futureExecutor,
       ioExecutor,
-      highAvailabilityServices.createBlobStore())
+      highAvailabilityServices.createBlobStore(),
+      metricRegistry)
 
     val archiveProps = getArchiveProps(archiveClass, archiveCount, archivePath)
 
@@ -2653,7 +2642,7 @@ object JobManager {
       highAvailabilityServices.getSubmittedJobGraphStore(),
       highAvailabilityServices.getCheckpointRecoveryFactory(),
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
 
     val jobManager: ActorRef = jobManagerActorName match {
@@ -2661,12 +2650,6 @@ object JobManager {
       case None => actorSystem.actorOf(jobManagerProps)
     }
 
-    metricsRegistry match {
-      case Some(registry) =>
-        registry.startQueryService(actorSystem, null)
-      case None =>
-    }
-
     (jobManager, archive)
   }
 
@@ -2693,7 +2676,7 @@ object JobManager {
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
     jobRecoveryTimeout: FiniteDuration,
-    metricsRegistry: Option[FlinkMetricRegistry],
+    jobManagerMetricGroup: JobManagerMetricGroup,
     optRestAddress: Option[String]): Props = {
 
     Props(
@@ -2712,7 +2695,7 @@ object JobManager {
       submittedJobGraphStore,
       checkpointRecoveryFactory,
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3e896ca..5c19c7a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -24,8 +24,9 @@ import java.util.UUID
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
-import org.apache.flink.runtime.blob.{PermanentBlobKey}
+import org.apache.flink.runtime.blob.PermanentBlobKey
 import org.apache.flink.runtime.client.{JobStatusMessage, 
SerializedJobExecutionResult}
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, 
ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{Instance, InstanceID}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
@@ -419,9 +420,9 @@ object JobManagerMessages {
   /**
    * Requests the [[Instance]] object of the task manager with the given 
instance ID
    *
-   * @param instanceID Instance ID of the task manager
+   * @param resourceId identifying the TaskManager which shall be retrieved
    */
-  case class RequestTaskManagerInstance(instanceID: InstanceID)
+  case class RequestTaskManagerInstance(resourceId: ResourceID)
 
   /**
    * Returns the [[Instance]] object of the requested task manager. This is in 
response to

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index c152f4a..689d98f 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl}
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
 import 
org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, 
AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -121,6 +122,9 @@ abstract class FlinkMiniCluster(
     Hardware.getNumberCPUCores(),
     new ExecutorThreadFactory("mini-cluster-io"))
 
+  protected val metricRegistry = new MetricRegistryImpl(
+    MetricRegistryConfiguration.fromConfiguration(originalConfiguration))
+
   def this(configuration: Configuration, useSingleActorSystem: Boolean) {
     this(
       configuration,
@@ -325,6 +329,10 @@ abstract class FlinkMiniCluster(
 
     lazy val singleActorSystem = startJobManagerActorSystem(0)
 
+    if 
(originalConfiguration.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, 
false)) {
+      metricRegistry.startQueryService(singleActorSystem, null)
+    }
+
     val (jmActorSystems, jmActors) =
       (for(i <- 0 until numJobManagers) yield {
         val actorSystem = if(useSingleActorSystem) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index e22230e..e9bdb2a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.minicluster
 
 import java.net.InetAddress
+import java.util.UUID
 import java.util.concurrent.{Executor, ScheduledExecutorService}
 
 import akka.actor.{ActorRef, ActorSystem, Props}
@@ -46,7 +47,8 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.JobManagerMessages
 import 
org.apache.flink.runtime.messages.JobManagerMessages.{RunningJobsStatus, 
StoppingFailure, StoppingResponse}
-import org.apache.flink.runtime.metrics.MetricRegistry
+import org.apache.flink.runtime.metrics.groups.{JobManagerMetricGroup, 
TaskManagerMetricGroup}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl}
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
 import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}
 import org.apache.flink.runtime.util.EnvironmentInformation
@@ -83,6 +85,12 @@ class LocalFlinkMiniCluster(
 
   def this(userConfiguration: Configuration) = this(userConfiguration, true)
 
+  override def startInternalShutdown() {
+    metricRegistry.shutdown()
+
+    super.startInternalShutdown()
+  }
+
   // --------------------------------------------------------------------------
 
   override def generateConfiguration(userConfiguration: Configuration): 
Configuration = {
@@ -137,23 +145,20 @@ class LocalFlinkMiniCluster(
     }
 
     val (instanceManager,
-    scheduler,
-    blobServer,
-    libraryCacheManager,
-    restartStrategyFactory,
-    timeout,
-    archiveCount,
-    archivePath,
-    jobRecoveryTimeout,
-    metricsRegistry) = JobManager.createJobManagerComponents(
+      scheduler,
+      blobServer,
+      libraryCacheManager,
+      restartStrategyFactory,
+      timeout,
+      archiveCount,
+      archivePath,
+      jobRecoveryTimeout,
+      jobManagerMetricGroup) = JobManager.createJobManagerComponents(
       config,
       futureExecutor,
       ioExecutor,
-      highAvailabilityServices.createBlobStore())
-
-    if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
-      metricsRegistry.get.startQueryService(system, null)
-    }
+      highAvailabilityServices.createBlobStore(),
+      metricRegistry)
 
     val archive = system.actorOf(
       getArchiveProps(
@@ -180,7 +185,7 @@ class LocalFlinkMiniCluster(
         highAvailabilityServices.getSubmittedJobGraphStore(),
         highAvailabilityServices.getCheckpointRecoveryFactory(),
         jobRecoveryTimeout,
-        metricsRegistry,
+        jobManagerMetricGroup,
         optRestAddress),
       jobManagerName)
   }
@@ -248,9 +253,8 @@ class LocalFlinkMiniCluster(
 
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
-      resourceID)
-
-    val metricRegistry = taskManagerServices.getMetricRegistry()
+      resourceID,
+      metricRegistry)
 
     val props = getTaskManagerProps(
       taskManagerClass,
@@ -260,7 +264,7 @@ class LocalFlinkMiniCluster(
       taskManagerServices.getMemoryManager(),
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment,
-      metricRegistry)
+      taskManagerServices.getTaskManagerMetricGroup)
 
     if (config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false)) {
       metricRegistry.startQueryService(system, resourceID)
@@ -296,7 +300,7 @@ class LocalFlinkMiniCluster(
       submittedJobGraphStore: SubmittedJobGraphStore,
       checkpointRecoveryFactory: CheckpointRecoveryFactory,
       jobRecoveryTimeout: FiniteDuration,
-      metricsRegistry: Option[MetricRegistry],
+      jobManagerMetricGroup: JobManagerMetricGroup,
       optRestAddress: Option[String])
     : Props = {
 
@@ -316,7 +320,7 @@ class LocalFlinkMiniCluster(
       submittedJobGraphStore,
       checkpointRecoveryFactory,
       jobRecoveryTimeout,
-      metricsRegistry,
+      jobManagerMetricGroup,
       optRestAddress)
   }
 
@@ -328,7 +332,7 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    metricsRegistry: MetricRegistry): Props = {
+    taskManagerMetricGroup: TaskManagerMetricGroup): Props = {
 
     TaskManager.getTaskManagerProps(
       taskManagerClass,
@@ -339,7 +343,7 @@ class LocalFlinkMiniCluster(
       ioManager,
       networkEnvironment,
       highAvailabilityServices,
-      metricsRegistry)
+      taskManagerMetricGroup)
   }
 
   def getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index cc01a8d..f209dac 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, FileInputStream, IOException}
 import java.lang.management.ManagementFactory
 import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket}
 import java.util
-import java.util.concurrent.{Callable, TimeUnit}
+import java.util.concurrent.{Callable, TimeUnit, TimeoutException}
 import java.util.{Collections, UUID}
 
 import _root_.akka.actor._
@@ -63,8 +63,7 @@ import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
 import org.apache.flink.runtime.messages.{Acknowledge, 
StackTraceSampleResponse}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
-import org.apache.flink.runtime.metrics.util.MetricUtils
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistryImpl, MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.{SecurityConfiguration, SecurityUtils}
 import org.apache.flink.runtime.taskexecutor.{TaskExecutor, 
TaskManagerConfiguration, TaskManagerServices, TaskManagerServicesConfiguration}
@@ -127,7 +126,7 @@ class TaskManager(
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
     protected val highAvailabilityServices: HighAvailabilityServices,
-    protected val metricsRegistry: FlinkMetricRegistry)
+    protected val taskManagerMetricGroup: TaskManagerMetricGroup)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to 
filter after logging
   with LogMessages // Mixin order is important: first we want to support 
message logging
@@ -154,8 +153,6 @@ class TaskManager(
     getJobManagerLeaderRetriever(
       HighAvailabilityServices.DEFAULT_JOB_ID)
 
-  private var taskManagerMetricGroup : TaskManagerMetricGroup = _
-
   /** Actors which want to be notified once this task manager has been
     * registered at the job manager */
   private val waitForRegistration = scala.collection.mutable.Set[ActorRef]()
@@ -258,13 +255,8 @@ class TaskManager(
     } catch {
       case t: Exception => log.error("FileCache did not shutdown properly.", t)
     }
-    
-    // failsafe shutdown of the metrics registry
-    try {
-      metricsRegistry.shutdown()
-    } catch {
-      case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
-    }
+
+    taskManagerMetricGroup.close()
 
     log.info(s"Task manager ${self.path} is completely shut down.")
   }
@@ -980,12 +972,6 @@ class TaskManager(
         throw new RuntimeException(message, e)
     }
     
-    taskManagerMetricGroup = 
-      new TaskManagerMetricGroup(metricsRegistry, location.getHostname, 
id.toString)
-    
-    MetricUtils.instantiateStatusMetrics(taskManagerMetricGroup)
-    MetricUtils.instantiateNetworkMetrics(taskManagerMetricGroup, network)
-    
     // watch job manager to detect when it dies
     context.watch(jobManager)
 
@@ -1832,15 +1818,22 @@ object TaskManager {
       actorSystemPort,
       LOG.logger)
 
+    val metricRegistry = new MetricRegistryImpl(
+      MetricRegistryConfiguration.fromConfiguration(configuration))
+
+    metricRegistry.startQueryService(taskManagerSystem, resourceID)
+
     // start all the TaskManager services (network stack,  library cache, ...)
     // and the TaskManager actor
     try {
+
       LOG.info("Starting TaskManager actor")
       val taskManager = startTaskManagerComponentsAndActor(
         configuration,
         resourceID,
         taskManagerSystem,
         highAvailabilityServices,
+        metricRegistry,
         taskManagerHostname,
         Some(TaskExecutor.TASK_MANAGER_NAME),
         localTaskManagerCommunication = false,
@@ -1893,6 +1886,9 @@ object TaskManager {
         }
         throw t
     }
+
+    // shut down the metric query service
+    metricRegistry.shutdown()
   }
 
   /**
@@ -1984,6 +1980,7 @@ object TaskManager {
       resourceID: ResourceID,
       actorSystem: ActorSystem,
       highAvailabilityServices: HighAvailabilityServices,
+      metricRegistry: FlinkMetricRegistry,
       taskManagerHostname: String,
       taskManagerActorName: Option[String],
       localTaskManagerCommunication: Boolean,
@@ -1999,9 +1996,8 @@ object TaskManager {
 
     val taskManagerServices = TaskManagerServices.fromConfiguration(
       taskManagerServicesConfiguration,
-      resourceID)
-
-    val metricRegistry = taskManagerServices.getMetricRegistry()
+      resourceID,
+      metricRegistry)
 
     // create the actor properties (which define the actor constructor 
parameters)
     val tmProps = getTaskManagerProps(
@@ -2013,9 +2009,7 @@ object TaskManager {
       taskManagerServices.getIOManager(),
       taskManagerServices.getNetworkEnvironment(),
       highAvailabilityServices,
-      metricRegistry)
-
-    metricRegistry.startQueryService(actorSystem, resourceID)
+      taskManagerServices.getTaskManagerMetricGroup)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -2032,7 +2026,7 @@ object TaskManager {
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
     highAvailabilityServices: HighAvailabilityServices,
-    metricsRegistry: FlinkMetricRegistry
+    taskManagerMetricGroup: TaskManagerMetricGroup
   ): Props = {
     Props(
       taskManagerClass,
@@ -2044,7 +2038,7 @@ object TaskManager {
       networkEnvironment,
       taskManagerConfig.getNumberSlots(),
       highAvailabilityServices,
-      metricsRegistry)
+      taskManagerMetricGroup)
   }
 
   // --------------------------------------------------------------------------

Reply via email to