http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
new file mode 100644
index 0000000..143faaf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Utility class for the serialization of metrics.
+ */
+public class MetricDumpSerialization {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricDumpSerialization.class);
+
+       private MetricDumpSerialization() {
+       }
+
+       
//-------------------------------------------------------------------------
+       // Serialization
+       
//-------------------------------------------------------------------------
+       public static class MetricDumpSerializer {
+               private ByteArrayOutputStream baos = new 
ByteArrayOutputStream(4096);
+               private DataOutputStream dos = new DataOutputStream(baos);
+
+               /**
+                * Serializes the given metrics and returns the resulting byte 
array.
+                *
+                * @param counters   counters to serialize
+                * @param gauges     gauges to serialize
+                * @param histograms histograms to serialize
+                * @return byte array containing the serialized metrics
+                * @throws IOException
+                */
+               public byte[] serialize(
+                       Map<Counter, Tuple2<QueryScopeInfo, String>> counters,
+                       Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges,
+                       Map<Histogram, Tuple2<QueryScopeInfo, String>> 
histograms,
+                       Map<Meter, Tuple2<QueryScopeInfo, String>> meters) 
throws IOException {
+                               
+                       baos.reset();
+                       dos.writeInt(counters.size());
+                       dos.writeInt(gauges.size());
+                       dos.writeInt(histograms.size());
+                       dos.writeInt(meters.size());
+
+                       for (Map.Entry<Counter, Tuple2<QueryScopeInfo, String>> 
entry : counters.entrySet()) {
+                               serializeMetricInfo(dos, entry.getValue().f0);
+                               serializeString(dos, entry.getValue().f1);
+                               serializeCounter(dos, entry.getKey());
+                       }
+
+                       for (Map.Entry<Gauge<?>, Tuple2<QueryScopeInfo, 
String>> entry : gauges.entrySet()) {
+                               serializeMetricInfo(dos, entry.getValue().f0);
+                               serializeString(dos, entry.getValue().f1);
+                               serializeGauge(dos, entry.getKey());
+                       }
+
+                       for (Map.Entry<Histogram, Tuple2<QueryScopeInfo, 
String>> entry : histograms.entrySet()) {
+                               serializeMetricInfo(dos, entry.getValue().f0);
+                               serializeString(dos, entry.getValue().f1);
+                               serializeHistogram(dos, entry.getKey());
+                       }
+
+                       for (Map.Entry<Meter, Tuple2<QueryScopeInfo, String>> 
entry : meters.entrySet()) {
+                               serializeMetricInfo(dos, entry.getValue().f0);
+                               serializeString(dos, entry.getValue().f1);
+                               serializeMeter(dos, entry.getKey());
+                       }
+                       return baos.toByteArray();
+               }
+
+               public void close() {
+                       try {
+                               dos.close();
+                       } catch (Exception e) {
+                               LOG.debug("Failed to close OutputStream.", e);
+                       }
+                       try {
+                               baos.close();
+                       } catch (Exception e) {
+                               LOG.debug("Failed to close OutputStream.", e);
+                       }
+               }
+       }
+
+       private static void serializeMetricInfo(DataOutputStream dos, 
QueryScopeInfo info) throws IOException {
+               serializeString(dos, info.scope);
+               dos.writeByte(info.getCategory());
+               switch (info.getCategory()) {
+                       case INFO_CATEGORY_JM:
+                               break;
+                       case INFO_CATEGORY_TM:
+                               String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+                               serializeString(dos, tmID);
+                               break;
+                       case INFO_CATEGORY_JOB:
+                               QueryScopeInfo.JobQueryScopeInfo jobInfo = 
(QueryScopeInfo.JobQueryScopeInfo) info;
+                               serializeString(dos, jobInfo.jobID);
+                               break;
+                       case INFO_CATEGORY_TASK:
+                               QueryScopeInfo.TaskQueryScopeInfo taskInfo = 
(QueryScopeInfo.TaskQueryScopeInfo) info;
+                               serializeString(dos, taskInfo.jobID);
+                               serializeString(dos, taskInfo.vertexID);
+                               dos.writeInt(taskInfo.subtaskIndex);
+                               break;
+                       case INFO_CATEGORY_OPERATOR:
+                               QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+                               serializeString(dos, operatorInfo.jobID);
+                               serializeString(dos, operatorInfo.vertexID);
+                               dos.writeInt(operatorInfo.subtaskIndex);
+                               serializeString(dos, operatorInfo.operatorName);
+                               break;
+               }
+       }
+
+       private static void serializeString(DataOutputStream dos, String 
string) throws IOException {
+               byte[] bytes = string.getBytes();
+               dos.writeInt(bytes.length);
+               dos.write(bytes);
+       }
+
+       private static void serializeCounter(DataOutputStream dos, Counter 
counter) throws IOException {
+               dos.writeLong(counter.getCount());
+       }
+
+       private static void serializeGauge(DataOutputStream dos, Gauge<?> 
gauge) throws IOException {
+               serializeString(dos, gauge.getValue().toString());
+       }
+
+       private static void serializeHistogram(DataOutputStream dos, Histogram 
histogram) throws IOException {
+               HistogramStatistics stat = histogram.getStatistics();
+
+               dos.writeLong(stat.getMin());
+               dos.writeLong(stat.getMax());
+               dos.writeDouble(stat.getMean());
+               dos.writeDouble(stat.getQuantile(0.5));
+               dos.writeDouble(stat.getStdDev());
+               dos.writeDouble(stat.getQuantile(0.75));
+               dos.writeDouble(stat.getQuantile(0.90));
+               dos.writeDouble(stat.getQuantile(0.95));
+               dos.writeDouble(stat.getQuantile(0.98));
+               dos.writeDouble(stat.getQuantile(0.99));
+               dos.writeDouble(stat.getQuantile(0.999));
+       }
+
+       private static void serializeMeter(DataOutputStream dos, Meter meter) 
throws IOException {
+               dos.writeDouble(meter.getRate());
+       }
+
+       
//-------------------------------------------------------------------------
+       // Deserialization
+       
//-------------------------------------------------------------------------
+       public static class MetricDumpDeserializer {
+               /**
+                * De-serializes metrics from the given byte array and returns 
them as a list of {@link MetricDump}.
+                *
+                * @param data serialized metrics
+                * @return A list containing the deserialized metrics.
+                * @throws IOException
+                */
+               public List<MetricDump> deserialize(byte[] data) throws 
IOException {
+                       ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                       DataInputStream dis = new DataInputStream(bais);
+
+                       int numCounters = dis.readInt();
+                       int numGauges = dis.readInt();
+                       int numHistograms = dis.readInt();
+                       int numMeters = dis.readInt();
+
+                       List<MetricDump> metrics = new ArrayList<>(numCounters 
+ numGauges + numHistograms);
+
+                       for (int x = 0; x < numCounters; x++) {
+                               metrics.add(deserializeCounter(dis));
+                       }
+
+                       for (int x = 0; x < numGauges; x++) {
+                               metrics.add(deserializeGauge(dis));
+                       }
+
+                       for (int x = 0; x < numHistograms; x++) {
+                               metrics.add(deserializeHistogram(dis));
+                       }
+
+                       for (int x = 0; x < numMeters; x++) {
+                               metrics.add(deserializeMeter(dis));
+                       }
+
+                       return metrics;
+               }
+       }
+
+       private static String deserializeString(DataInputStream dis) throws 
IOException {
+               int stringLength = dis.readInt();
+               byte[] bytes = new byte[stringLength];
+               dis.readFully(bytes);
+               return new String(bytes);
+       }
+
+       private static MetricDump.CounterDump 
deserializeCounter(DataInputStream dis) throws IOException {
+               QueryScopeInfo scope = deserializeMetricInfo(dis);
+               String name = deserializeString(dis);
+               return new MetricDump.CounterDump(scope, name, dis.readLong());
+       }
+
+       private static MetricDump.GaugeDump deserializeGauge(DataInputStream 
dis) throws IOException {
+               QueryScopeInfo scope = deserializeMetricInfo(dis);
+               String name = deserializeString(dis);
+               String value = deserializeString(dis);
+               return new MetricDump.GaugeDump(scope, name, value);
+       }
+
+       private static MetricDump.HistogramDump 
deserializeHistogram(DataInputStream dis) throws IOException {
+               QueryScopeInfo info = deserializeMetricInfo(dis);
+               String name = deserializeString(dis);
+               long min = dis.readLong();
+               long max = dis.readLong();
+               double mean = dis.readDouble();
+               double median = dis.readDouble();
+               double stddev = dis.readDouble();
+               double p75 = dis.readDouble();
+               double p90 = dis.readDouble();
+               double p95 = dis.readDouble();
+               double p98 = dis.readDouble();
+               double p99 = dis.readDouble();
+               double p999 = dis.readDouble();
+               return new MetricDump.HistogramDump(info, name, min, max, mean, 
median, stddev, p75, p90, p95, p98, p99, p999);
+       }
+
+       private static MetricDump.MeterDump deserializeMeter(DataInputStream 
dis) throws IOException {
+               QueryScopeInfo info = deserializeMetricInfo(dis);
+               String name = deserializeString(dis);
+               double rate = dis.readDouble();
+               return new MetricDump.MeterDump(info, name, rate);
+       }
+
+       private static QueryScopeInfo deserializeMetricInfo(DataInputStream 
dis) throws IOException {
+               String jobID;
+               String vertexID;
+               int subtaskIndex;
+
+               String scope = deserializeString(dis);
+               byte cat = dis.readByte();
+               switch (cat) {
+                       case INFO_CATEGORY_JM:
+                               return new 
QueryScopeInfo.JobManagerQueryScopeInfo(scope);
+                       case INFO_CATEGORY_TM:
+                               String tmID = deserializeString(dis);
+                               return new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmID, scope);
+                       case INFO_CATEGORY_JOB:
+                               jobID = deserializeString(dis);
+                               return new 
QueryScopeInfo.JobQueryScopeInfo(jobID, scope);
+                       case INFO_CATEGORY_TASK:
+                               jobID = deserializeString(dis);
+                               vertexID = deserializeString(dis);
+                               subtaskIndex = dis.readInt();
+                               return new 
QueryScopeInfo.TaskQueryScopeInfo(jobID, vertexID, subtaskIndex, scope);
+                       case INFO_CATEGORY_OPERATOR:
+                               jobID = deserializeString(dis);
+                               vertexID = deserializeString(dis);
+                               subtaskIndex = dis.readInt();
+                               String operatorName = deserializeString(dis);
+                               return new 
QueryScopeInfo.OperatorQueryScopeInfo(jobID, vertexID, subtaskIndex, 
operatorName, scope);
+                       default:
+                               throw new IOException("sup");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
new file mode 100644
index 0000000..6e0b443
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricQueryService.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import akka.actor.UntypedActor;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.CharacterFilter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpSerializer;
+
+/**
+ * The MetricQueryService creates a key-value representation of all metrics 
currently registered with Flink when queried.
+ *
+ * It is realized as an actor and can be notified of
+ * - an added metric by calling {@link 
MetricQueryService#notifyOfAddedMetric(ActorRef, Metric, String, 
AbstractMetricGroup)}
+ * - a removed metric by calling {@link 
MetricQueryService#notifyOfRemovedMetric(ActorRef, Metric)}
+ * - a metric dump request by sending the return value of {@link 
MetricQueryService#getCreateDump()}
+ */
+public class MetricQueryService extends UntypedActor {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryService.class);
+
+       public static final String METRIC_QUERY_SERVICE_NAME = 
"MetricQueryService";
+
+       private static final CharacterFilter FILTER = new CharacterFilter() {
+               @Override
+               public String filterCharacters(String input) {
+                       return replaceInvalidChars(input);
+               }
+       };
+
+       private final MetricDumpSerializer serializer = new 
MetricDumpSerializer();
+
+       private final Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = 
new HashMap<>();
+       private final Map<Counter, Tuple2<QueryScopeInfo, String>> counters = 
new HashMap<>();
+       private final Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms 
= new HashMap<>();
+       private final Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new 
HashMap<>();
+
+       @Override
+       public void postStop() {
+               serializer.close();
+       }
+
+       @Override
+       public void onReceive(Object message) {
+               try {
+                       if (message instanceof AddMetric) {
+                               AddMetric added = (AddMetric) message;
+
+                               String metricName = added.metricName;
+                               Metric metric = added.metric;
+                               AbstractMetricGroup group = added.group;
+
+                               QueryScopeInfo info = 
group.getQueryServiceMetricInfo(FILTER);
+
+                               if (metric instanceof Counter) {
+                                       counters.put((Counter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                               } else if (metric instanceof Gauge) {
+                                       gauges.put((Gauge<?>) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                               } else if (metric instanceof Histogram) {
+                                       histograms.put((Histogram) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                               } else if (metric instanceof Meter) {
+                                       meters.put((Meter) metric, new 
Tuple2<>(info, FILTER.filterCharacters(metricName)));
+                               }
+                       } else if (message instanceof RemoveMetric) {
+                               Metric metric = (((RemoveMetric) 
message).metric);
+                               if (metric instanceof Counter) {
+                                       this.counters.remove(metric);
+                               } else if (metric instanceof Gauge) {
+                                       this.gauges.remove(metric);
+                               } else if (metric instanceof Histogram) {
+                                       this.histograms.remove(metric);
+                               } else if (metric instanceof Meter) {
+                                       this.meters.remove(metric);
+                               }
+                       } else if (message instanceof CreateDump) {
+                               byte[] dump = serializer.serialize(counters, 
gauges, histograms, meters);
+                               getSender().tell(dump, getSelf());
+                       } else {
+                               LOG.warn("MetricQueryServiceActor received an 
invalid message. " + message.toString());
+                               getSender().tell(new Status.Failure(new 
IOException("MetricQueryServiceActor received an invalid message. " + 
message.toString())), getSelf());
+                       }
+               } catch (Exception e) {
+                       LOG.warn("An exception occurred while processing a 
message.", e);
+               }
+       }
+
+       /**
+        * Lightweight method to replace unsupported characters.
+        * If the string does not contain any unsupported characters, this 
method creates no
+        * new string (and in fact no new objects at all).
+        *
+        * <p>Replacements:
+        *
+        * <ul>
+        *     <li>{@code space : . ,} are replaced by {@code _} 
(underscore)</li>
+        * </ul>
+        */
+       static String replaceInvalidChars(String str) {
+               char[] chars = null;
+               final int strLen = str.length();
+               int pos = 0;
+
+               for (int i = 0; i < strLen; i++) {
+                       final char c = str.charAt(i);
+                       switch (c) {
+                               case ' ':
+                               case '.':
+                               case ':':
+                               case ',':
+                                       if (chars == null) {
+                                               chars = str.toCharArray();
+                                       }
+                                       chars[pos++] = '_';
+                                       break;
+                               default:
+                                       if (chars != null) {
+                                               chars[pos] = c;
+                                       }
+                                       pos++;
+                       }
+               }
+
+               return chars == null ? str : new String(chars, 0, pos);
+       }
+
+       /**
+        * Starts the MetricQueryService actor in the given actor system.
+        *
+        * @param actorSystem The actor system running the MetricQueryService
+        * @return actor reference to the MetricQueryService
+        */
+       public static ActorRef startMetricQueryService(ActorSystem actorSystem) 
{
+               return 
actorSystem.actorOf(Props.create(MetricQueryService.class), 
METRIC_QUERY_SERVICE_NAME);
+       }
+
+       /**
+        * Utility method to notify a MetricQueryService of an added metric.
+        *
+        * @param service    MetricQueryService to notify
+        * @param metric     added metric
+        * @param metricName metric name
+        * @param group      group the metric was added on
+        */
+       public static void notifyOfAddedMetric(ActorRef service, Metric metric, 
String metricName, AbstractMetricGroup group) {
+               service.tell(new AddMetric(metricName, metric, group), null);
+       }
+
+       /**
+        * Utility method to notify a MetricQueryService of a removed metric.
+        *
+        * @param service MetricQueryService to notify
+        * @param metric  removed metric
+        */
+       public static void notifyOfRemovedMetric(ActorRef service, Metric 
metric) {
+               service.tell(new RemoveMetric(metric), null);
+       }
+
+       private static class AddMetric {
+               private final String metricName;
+               private final Metric metric;
+               private final AbstractMetricGroup group;
+
+               private AddMetric(String metricName, Metric metric, 
AbstractMetricGroup group) {
+                       this.metricName = metricName;
+                       this.metric = metric;
+                       this.group = group;
+               }
+       }
+
+       private static class RemoveMetric {
+               private final Metric metric;
+
+               private RemoveMetric(Metric metric) {
+                       this.metric = metric;
+               }
+       }
+
+       public static Object getCreateDump() {
+               return CreateDump.INSTANCE;
+       }
+
+       private static class CreateDump implements Serializable {
+               private static CreateDump INSTANCE = new CreateDump();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
new file mode 100644
index 0000000..df5c2bf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfo.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+/**
+ * Container for scope related information as required by the 
MetricQueryService.
+ */
+public abstract class QueryScopeInfo {
+       /** Categories to be returned by {@link QueryScopeInfo#getCategory()} 
to avoid instanceof checks. */
+       public static final byte INFO_CATEGORY_JM = 0;
+       public static final byte INFO_CATEGORY_TM = 1;
+       public static final byte INFO_CATEGORY_JOB = 2;
+       public static final byte INFO_CATEGORY_TASK = 3;
+       public static final byte INFO_CATEGORY_OPERATOR = 4;
+
+       /** The remaining scope not covered by specific fields */
+       public final String scope;
+
+       private QueryScopeInfo(String scope) {
+               this.scope = scope;
+       }
+
+       /**
+        * Create a copy of this QueryScopeInfo and append the given scope.
+        *
+        * @param userScope scope to append
+        * @return modified copy of this QueryScopeInfo
+        */
+       public abstract QueryScopeInfo copy(String userScope);
+
+       /**
+        * Returns the category for this QueryScopeInfo.
+        * 
+        * @return category
+     */
+       public abstract byte getCategory();
+
+       /**
+        * Container for the job manager scope. Stores no additional 
information.
+     */
+       public static class JobManagerQueryScopeInfo extends QueryScopeInfo {
+               public JobManagerQueryScopeInfo() {
+                       super("");
+               }
+
+               public JobManagerQueryScopeInfo(String scope) {
+                       super(scope);
+               }
+
+               @Override
+               public JobManagerQueryScopeInfo copy(String additionalScope) {
+                       return new JobManagerQueryScopeInfo(this.scope + 
additionalScope);
+               }
+
+               @Override
+               public byte getCategory() {
+                       return INFO_CATEGORY_JM;
+               }
+       }
+
+       /**
+        * Container for the task manager scope. Stores the ID of the task 
manager.
+     */
+       public static class TaskManagerQueryScopeInfo extends QueryScopeInfo {
+               public final String taskManagerID;
+
+               public TaskManagerQueryScopeInfo(String taskManagerId) {
+                       this(taskManagerId, "");
+               }
+
+               public TaskManagerQueryScopeInfo(String taskManagerId, String 
scope) {
+                       super(scope);
+                       this.taskManagerID = taskManagerId;
+               }
+
+               @Override
+               public TaskManagerQueryScopeInfo copy(String additionalScope) {
+                       return new 
TaskManagerQueryScopeInfo(this.taskManagerID, this.scope + additionalScope);
+               }
+
+               @Override
+               public byte getCategory() {
+                       return INFO_CATEGORY_TM;
+               }
+       }
+
+       /**
+        * Container for the job scope. Stores the ID of the job.
+     */
+       public static class JobQueryScopeInfo extends QueryScopeInfo {
+               public final String jobID;
+
+               public JobQueryScopeInfo(String jobID) {
+                       this(jobID, "");
+               }
+
+               public JobQueryScopeInfo(String jobID, String scope) {
+                       super(scope);
+                       this.jobID = jobID;
+               }
+
+               @Override
+               public JobQueryScopeInfo copy(String additionalScope) {
+                       return new JobQueryScopeInfo(this.jobID, this.scope + 
additionalScope);
+               }
+
+               @Override
+               public byte getCategory() {
+                       return INFO_CATEGORY_JOB;
+               }
+       }
+
+       /**
+        * Container for the task scope. Stores the ID of the job/vertex and 
subtask index.
+     */
+       public static class TaskQueryScopeInfo extends QueryScopeInfo {
+               public final String jobID;
+               public final String vertexID;
+               public final int subtaskIndex;
+
+               public TaskQueryScopeInfo(String jobID, String vertexid, int 
subtaskIndex) {
+                       this(jobID, vertexid, subtaskIndex, "");
+               }
+
+               public TaskQueryScopeInfo(String jobID, String vertexid, int 
subtaskIndex, String scope) {
+                       super(scope);
+                       this.jobID = jobID;
+                       this.vertexID = vertexid;
+                       this.subtaskIndex = subtaskIndex;
+               }
+
+               @Override
+               public TaskQueryScopeInfo copy(String additionalScope) {
+                       return new TaskQueryScopeInfo(this.jobID, 
this.vertexID, this.subtaskIndex, this.scope + additionalScope);
+               }
+
+               @Override
+               public byte getCategory() {
+                       return INFO_CATEGORY_TASK;
+               }
+       }
+
+       /**
+        * Container for the operator scope. Stores the ID of the job/vertex, 
the subtask index and the name of the operator.
+     */
+       public static class OperatorQueryScopeInfo extends QueryScopeInfo {
+               public final String jobID;
+               public final String vertexID;
+               public final int subtaskIndex;
+               public final String operatorName;
+
+               public OperatorQueryScopeInfo(String jobID, String vertexid, 
int subtaskIndex, String operatorName) {
+                       this(jobID, vertexid, subtaskIndex, operatorName, "");
+               }
+
+               public OperatorQueryScopeInfo(String jobID, String vertexid, 
int subtaskIndex, String operatorName, String scope) {
+                       super(scope);
+                       this.jobID = jobID;
+                       this.vertexID = vertexid;
+                       this.subtaskIndex = subtaskIndex;
+                       this.operatorName = operatorName;
+               }
+
+               @Override
+               public OperatorQueryScopeInfo copy(String additionalScope) {
+                       return new OperatorQueryScopeInfo(this.jobID, 
this.vertexID, this.subtaskIndex, this.operatorName, this.scope + 
additionalScope);
+               }
+
+               @Override
+               public byte getCategory() {
+                       return INFO_CATEGORY_OPERATOR;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index 89fe3cd..75476a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,6 +87,9 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
         * For example: "host-7.taskmanager-2.window_word_count.my-mapper" */
        private String scopeString;
 
+       /** The metrics query service scope represented by this group, lazily 
computed. */
+       protected QueryScopeInfo queryServiceScopeInfo;
+
        /** Flag indicating whether this group has been closed */
        private volatile boolean closed;
 
@@ -123,6 +127,27 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
        }
 
        /**
+        * Returns the metric query service scope for this group.
+        * 
+        * @param filter character filter
+        * @return query service scope
+     */
+       public QueryScopeInfo getQueryServiceMetricInfo(CharacterFilter filter) 
{
+               if (queryServiceScopeInfo == null) {
+                       queryServiceScopeInfo = 
createQueryServiceMetricInfo(filter);
+               }
+               return queryServiceScopeInfo;
+       }
+
+       /**
+        * Creates the metric query service scope for this group.
+        *
+        * @param filter character filter
+        * @return query service scope
+     */
+       protected abstract QueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter);
+
+       /**
         * Returns the fully qualified metric name, for example
         * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
         * 

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
index 569ad0f..ab6418d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/GenericMetricGroup.java
@@ -18,16 +18,26 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 
 /**
  * A simple named {@link org.apache.flink.metrics.MetricGroup} that is used to 
hold
  * subgroups of metrics.
  */
 public class GenericMetricGroup extends 
AbstractMetricGroup<AbstractMetricGroup<?>> {
+       /** The name of this group */
+       private String name;
 
        public GenericMetricGroup(MetricRegistry registry, AbstractMetricGroup 
parent, String name) {
                super(registry, makeScopeComponents(parent, name), parent);
+               this.name = name;
+       }
+
+       @Override
+       protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter 
filter) {
+               return 
parent.getQueryServiceMetricInfo(filter).copy(filter.filterCharacters(this.name));
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
index da0d8f8..2f6b07a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobManagerMetricGroup.java
@@ -18,8 +18,10 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.HashMap;
@@ -46,6 +48,11 @@ public class JobManagerMetricGroup extends 
ComponentMetricGroup<JobManagerMetric
                return hostname;
        }
 
+       @Override
+       protected QueryScopeInfo.JobManagerQueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+               return new QueryScopeInfo.JobManagerQueryScopeInfo();
+       }
+
        // 
------------------------------------------------------------------------
        //  job groups
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
index e101d2f..091807f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/JobMetricGroup.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import javax.annotation.Nullable;
@@ -65,6 +67,11 @@ public abstract class JobMetricGroup<C extends 
ComponentMetricGroup<C>> extends
                return jobName;
        }
 
+       @Override
+       protected QueryScopeInfo.JobQueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+               return new 
QueryScopeInfo.JobQueryScopeInfo(this.jobId.toString());
+       }
+
        // 
------------------------------------------------------------------------
        //  Component Metric Group Specifics
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
index 9352faf..1ed55d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/OperatorMetricGroup.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.Collections;
@@ -42,6 +44,15 @@ public class OperatorMetricGroup extends 
ComponentMetricGroup<TaskMetricGroup> {
        public final TaskMetricGroup parent() {
                return parent;
        }
+
+       @Override
+       protected QueryScopeInfo.OperatorQueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+               return new QueryScopeInfo.OperatorQueryScopeInfo(
+                       this.parent.parent.jobId.toString(),
+                       this.parent.vertexId.toString(),
+                       this.parent.subtaskIndex,
+                       filter.filterCharacters(this.operatorName));
+       }
        
        // 
------------------------------------------------------------------------
        //  Component Metric Group Specifics

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
index 8f81cfd..5bdd014 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerMetricGroup.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 
 import java.util.HashMap;
@@ -55,6 +57,11 @@ public class TaskManagerMetricGroup extends 
ComponentMetricGroup<TaskManagerMetr
                return taskManagerId;
        }
 
+       @Override
+       protected QueryScopeInfo.TaskManagerQueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+               return new 
QueryScopeInfo.TaskManagerQueryScopeInfo(this.taskManagerId);
+       }
+
        // 
------------------------------------------------------------------------
        //  job groups
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 78fec97..0e76ab0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.util.AbstractID;
 
@@ -43,12 +45,12 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
        private final AbstractID executionId;
 
        @Nullable
-       private final AbstractID vertexId;
+       protected final AbstractID vertexId;
        
        @Nullable
        private final String taskName;
 
-       private final int subtaskIndex;
+       protected final int subtaskIndex;
 
        private final int attemptNumber;
 
@@ -113,6 +115,14 @@ public class TaskMetricGroup extends 
ComponentMetricGroup<TaskManagerJobMetricGr
                return ioMetrics;
        }
 
+       @Override
+       protected QueryScopeInfo.TaskQueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+               return new QueryScopeInfo.TaskQueryScopeInfo(
+                       this.parent.jobId.toString(),
+                       this.vertexId.toString(),
+                       this.subtaskIndex);
+       }
+
        // 
------------------------------------------------------------------------
        //  operators and cleanup
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f67be0e..1c68874 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2750,6 +2750,12 @@ object JobManager {
       case Some(actorName) => actorSystem.actorOf(jobManagerProps, actorName)
       case None => actorSystem.actorOf(jobManagerProps)
     }
+    
+    metricsRegistry match {
+      case Some(registry) =>
+        registry.startQueryService(actorSystem)
+      case None =>
+    }
 
     (jobManager, archive)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index cac5d91..27c9dd9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -203,7 +203,8 @@ class LocalFlinkMiniCluster(
     memoryManager,
     ioManager,
     network,
-    leaderRetrievalService) = TaskManager.createTaskManagerComponents(
+    leaderRetrievalService,
+    metricsRegistry) = TaskManager.createTaskManagerComponents(
       config,
       resourceID,
       hostname, // network interface to bind to
@@ -218,7 +219,10 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
+
+    metricsRegistry.startQueryService(system)
 
     system.actorOf(props, taskManagerActorName)
   }
@@ -274,7 +278,8 @@ class LocalFlinkMiniCluster(
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService): Props = {
+    leaderRetrievalService: LeaderRetrievalService,
+    metricsRegistry: MetricRegistry): Props = {
 
     TaskManager.getTaskManagerProps(
       taskManagerClass,
@@ -284,7 +289,8 @@ class LocalFlinkMiniCluster(
       memoryManager,
       ioManager,
       networkEnvironment,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
   }
 
   def getResourceManagerProps(

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8ebdd80..c882631 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -132,7 +132,8 @@ class TaskManager(
     protected val ioManager: IOManager,
     protected val network: NetworkEnvironment,
     protected val numberOfSlots: Int,
-    protected val leaderRetrievalService: LeaderRetrievalService)
+    protected val leaderRetrievalService: LeaderRetrievalService,
+    protected val metricsRegistry: FlinkMetricRegistry)
   extends FlinkActor
   with LeaderSessionMessageFilter // Mixin order is important: We want to 
filter after logging
   with LogMessages // Mixin order is important: first we want to support 
message logging
@@ -158,7 +159,6 @@ class TaskManager(
   /** Registry of metrics periodically transmitted to the JobManager */
   private val metricRegistry = TaskManager.createMetricsRegistry()
 
-  private var metricsRegistry : FlinkMetricRegistry = _
   private var taskManagerMetricGroup : TaskManagerMetricGroup = _
 
   /** Metric serialization */
@@ -276,11 +276,7 @@ class TaskManager(
     
     // failsafe shutdown of the metrics registry
     try {
-      val reg = metricsRegistry
-      metricsRegistry = null
-      if (reg != null) {
-        reg.shutdown()
-      }
+      metricsRegistry.shutdown()
     } catch {
       case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
     }
@@ -985,8 +981,6 @@ class TaskManager(
     else {
       libraryCacheManager = Some(new FallbackLibraryCacheManager)
     }
-
-    metricsRegistry = new FlinkMetricRegistry(config.configuration)
     
     taskManagerMetricGroup = 
       new TaskManagerMetricGroup(metricsRegistry, 
this.runtimeInfo.getHostname, id.toString)
@@ -1064,9 +1058,12 @@ class TaskManager(
       network.getKvStateRegistry.unregisterListener()
     }
     
-    // stop the metrics reporters
-    metricsRegistry.shutdown()
-    metricsRegistry = null
+    // failsafe shutdown of the metrics registry
+    try {
+      metricsRegistry.shutdown()
+    } catch {
+      case t: Exception => log.error("MetricRegistry did not shutdown 
properly.", t)
+    }
   }
 
   protected def handleJobManagerDisconnect(jobManager: ActorRef, msg: String): 
Unit = {
@@ -1849,7 +1846,8 @@ object TaskManager {
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService) = createTaskManagerComponents(
+      leaderRetrievalService,
+      metricsRegistry) = createTaskManagerComponents(
       configuration,
       resourceID,
       taskManagerHostname,
@@ -1865,7 +1863,10 @@ object TaskManager {
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
+
+    metricsRegistry.startQueryService(actorSystem)
 
     taskManagerActorName match {
       case Some(actorName) => actorSystem.actorOf(tmProps, actorName)
@@ -1881,7 +1882,8 @@ object TaskManager {
     memoryManager: MemoryManager,
     ioManager: IOManager,
     networkEnvironment: NetworkEnvironment,
-    leaderRetrievalService: LeaderRetrievalService
+    leaderRetrievalService: LeaderRetrievalService,
+    metricsRegistry: FlinkMetricRegistry
   ): Props = {
     Props(
       taskManagerClass,
@@ -1892,7 +1894,8 @@ object TaskManager {
       ioManager,
       networkEnvironment,
       taskManagerConfig.numberOfSlots,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
   }
 
   def createTaskManagerComponents(
@@ -1906,7 +1909,8 @@ object TaskManager {
       MemoryManager,
       IOManager,
       NetworkEnvironment,
-      LeaderRetrievalService) = {
+      LeaderRetrievalService,
+      FlinkMetricRegistry) = {
 
     val (taskManagerConfig : TaskManagerConfiguration,
     netConfig: NetworkEnvironmentConfiguration,
@@ -2081,12 +2085,15 @@ object TaskManager {
       case None => 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration)
     }
 
+    val metricsRegistry = new FlinkMetricRegistry(configuration)
+
     (taskManagerConfig,
       taskManagerLocation,
       memoryManager,
       ioManager,
       network,
-      leaderRetrievalService)
+      leaderRetrievalService,
+      metricsRegistry)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
new file mode 100644
index 0000000..bc0f005
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerializerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class MetricDumpSerializerTest {
+       @Test
+       public void testSerialization() throws IOException {
+               MetricDumpSerialization.MetricDumpSerializer serializer = new 
MetricDumpSerialization.MetricDumpSerializer();
+               MetricDumpSerialization.MetricDumpDeserializer deserializer = 
new MetricDumpSerialization.MetricDumpDeserializer();
+
+               Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new 
HashMap<>();
+               Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new 
HashMap<>();
+               Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new 
HashMap<>();
+               Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new 
HashMap<>();
+
+               SimpleCounter c1 = new SimpleCounter();
+               SimpleCounter c2 = new SimpleCounter();
+               SimpleCounter c3 = new SimpleCounter();
+
+               c1.inc(1);
+               c2.inc(2);
+
+               Gauge<Integer> g1 = new Gauge<Integer>() {
+                       @Override
+                       public Integer getValue() {
+                               return 4;
+                       }
+               };
+
+               Histogram h1 = new TestingHistogram();
+
+               Meter m1 = new Meter() {
+                       @Override
+                       public void markEvent() {
+                       }
+
+                       @Override
+                       public void markEvent(long n) {
+                       }
+
+                       @Override
+                       public double getRate() {
+                               return 5;
+                       }
+
+                       @Override
+                       public long getCount() {
+                               return 10;
+                       }
+               };
+
+               counters.put(c1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("A"), "c1"));
+               counters.put(c2, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "B"), "c2"));
+               meters.put(m1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobQueryScopeInfo("jid", "C"), "c3"));
+               gauges.put(g1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "D"), "g1"));
+               histograms.put(h1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "E"), "h1"));
+
+               byte[] serialized = serializer.serialize(counters, gauges, 
histograms, meters);
+               List<MetricDump> deserialized = 
deserializer.deserialize(serialized);
+
+               // ===== Counters 
==============================================================================================
+               assertEquals(5, deserialized.size());
+
+               for (MetricDump metric : deserialized) {
+                       switch (metric.getCategory()) {
+                               case METRIC_CATEGORY_COUNTER:
+                                       MetricDump.CounterDump counterDump = 
(MetricDump.CounterDump) metric;
+                                       switch ((byte) counterDump.count) {
+                                               case 1:
+                                                       
assertTrue(counterDump.scopeInfo instanceof 
QueryScopeInfo.JobManagerQueryScopeInfo);
+                                                       assertEquals("A", 
counterDump.scopeInfo.scope);
+                                                       assertEquals("c1", 
counterDump.name);
+                                                       counters.remove(c1);
+                                                       break;
+                                               case 2:
+                                                       
assertTrue(counterDump.scopeInfo instanceof 
QueryScopeInfo.TaskManagerQueryScopeInfo);
+                                                       assertEquals("B", 
counterDump.scopeInfo.scope);
+                                                       assertEquals("c2", 
counterDump.name);
+                                                       assertEquals("tmid", 
((QueryScopeInfo.TaskManagerQueryScopeInfo) 
counterDump.scopeInfo).taskManagerID);
+                                                       counters.remove(c2);
+                                                       break;
+                                               default:
+                                                       fail();
+                                       }
+                                       break;
+                               case METRIC_CATEGORY_GAUGE:
+                                       MetricDump.GaugeDump gaugeDump = 
(MetricDump.GaugeDump) metric;
+                                       assertEquals("4", gaugeDump.value);
+                                       assertEquals("g1", gaugeDump.name);
+
+                                       assertTrue(gaugeDump.scopeInfo 
instanceof QueryScopeInfo.TaskQueryScopeInfo);
+                                       QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) gaugeDump.scopeInfo;
+                                       assertEquals("D", taskInfo.scope);
+                                       assertEquals("jid", taskInfo.jobID);
+                                       assertEquals("vid", taskInfo.vertexID);
+                                       assertEquals(2, taskInfo.subtaskIndex);
+                                       gauges.remove(g1);
+                                       break;
+                               case METRIC_CATEGORY_HISTOGRAM:
+                                       MetricDump.HistogramDump histogramDump 
= (MetricDump.HistogramDump) metric;
+                                       assertEquals("h1", histogramDump.name);
+                                       assertEquals(0.5, histogramDump.median, 
0.1);
+                                       assertEquals(0.75, histogramDump.p75, 
0.1);
+                                       assertEquals(0.90, histogramDump.p90, 
0.1);
+                                       assertEquals(0.95, histogramDump.p95, 
0.1);
+                                       assertEquals(0.98, histogramDump.p98, 
0.1);
+                                       assertEquals(0.99, histogramDump.p99, 
0.1);
+                                       assertEquals(0.999, histogramDump.p999, 
0.1);
+                                       assertEquals(4, histogramDump.mean, 
0.1);
+                                       assertEquals(5, histogramDump.stddev, 
0.1);
+                                       assertEquals(6, histogramDump.max);
+                                       assertEquals(7, histogramDump.min);
+
+                                       assertTrue(histogramDump.scopeInfo 
instanceof QueryScopeInfo.OperatorQueryScopeInfo);
+                                       QueryScopeInfo.OperatorQueryScopeInfo 
opInfo = (QueryScopeInfo.OperatorQueryScopeInfo) histogramDump.scopeInfo;
+                                       assertEquals("E", opInfo.scope);
+                                       assertEquals("jid", opInfo.jobID);
+                                       assertEquals("vid", opInfo.vertexID);
+                                       assertEquals(2, opInfo.subtaskIndex);
+                                       assertEquals("opname", 
opInfo.operatorName);
+                                       histograms.remove(h1);
+                                       break;
+                               case METRIC_CATEGORY_METER:
+                                       MetricDump.MeterDump meterDump = 
(MetricDump.MeterDump) metric;
+                                       assertEquals(5.0, meterDump.rate, 0.1);
+
+                                       assertTrue(meterDump.scopeInfo 
instanceof QueryScopeInfo.JobQueryScopeInfo);
+                                       assertEquals("C", 
meterDump.scopeInfo.scope);
+                                       assertEquals("c3", meterDump.name);
+                                       assertEquals("jid", 
((QueryScopeInfo.JobQueryScopeInfo) meterDump.scopeInfo).jobID);
+                                       break;
+                               default:
+                                       fail();
+                       }
+               }
+               assertTrue(counters.isEmpty());
+               assertTrue(gauges.isEmpty());
+               assertTrue(histograms.isEmpty());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
new file mode 100644
index 0000000..3b65184
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricDumpTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static org.junit.Assert.assertEquals;
+
+public class MetricDumpTest {
+       @Test
+       public void testDumpedCounter() {
+               QueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo();
+
+               MetricDump.CounterDump cd = new MetricDump.CounterDump(info, 
"counter", 4);
+
+               assertEquals("counter", cd.name);
+               assertEquals(4, cd.count);
+               assertEquals(info, cd.scopeInfo);
+               assertEquals(METRIC_CATEGORY_COUNTER, cd.getCategory());
+       }
+
+       @Test
+       public void testDumpedGauge() {
+               QueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo();
+
+               MetricDump.GaugeDump gd = new MetricDump.GaugeDump(info, 
"gauge", "hello");
+
+               assertEquals("gauge", gd.name);
+               assertEquals("hello", gd.value);
+               assertEquals(info, gd.scopeInfo);
+               assertEquals(METRIC_CATEGORY_GAUGE, gd.getCategory());
+       }
+
+       @Test
+       public void testDumpedHistogram() {
+               QueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo();
+
+               MetricDump.HistogramDump hd = new 
MetricDump.HistogramDump(info, "hist", 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+               assertEquals("hist", hd.name);
+               assertEquals(1, hd.min);
+               assertEquals(2, hd.max);
+               assertEquals(3, hd.mean, 0.1);
+               assertEquals(4, hd.median, 0.1);
+               assertEquals(5, hd.stddev, 0.1);
+               assertEquals(6, hd.p75, 0.1);
+               assertEquals(7, hd.p90, 0.1);
+               assertEquals(8, hd.p95, 0.1);
+               assertEquals(9, hd.p98, 0.1);
+               assertEquals(10, hd.p99, 0.1);
+               assertEquals(11, hd.p999, 0.1);
+               assertEquals(info, hd.scopeInfo);
+               assertEquals(METRIC_CATEGORY_HISTOGRAM, hd.getCategory());
+       }
+
+       @Test
+       public void testDumpedMeter() {
+               QueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo();
+
+               MetricDump.MeterDump md = new MetricDump.MeterDump(info, 
"meter", 5.0);
+
+               assertEquals("meter", md.name);
+               assertEquals(5.0, md.rate, 0.1);
+               assertEquals(info, md.scopeInfo);
+               assertEquals(METRIC_CATEGORY_METER, md.getCategory());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
new file mode 100644
index 0000000..91563ec
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.testkit.TestActorRef;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class MetricQueryServiceTest extends TestLogger {
+       @Test
+       public void testCreateDump() throws Exception {
+
+               ActorSystem s = AkkaUtils.createLocalActorSystem(new 
Configuration());
+               ActorRef serviceActor = 
MetricQueryService.startMetricQueryService(s);
+               TestActorRef testActorRef = TestActorRef.create(s, 
Props.create(TestActor.class));
+               TestActor testActor = (TestActor) 
testActorRef.underlyingActor();
+
+               final Counter c = new SimpleCounter();
+               final Gauge<String> g = new Gauge<String>() {
+                       @Override
+                       public String getValue() {
+                               return "Hello";
+                       }
+               };
+               final Histogram h = new TestingHistogram();
+               final Meter m = new Meter() {
+
+                       @Override
+                       public void markEvent() {
+                       }
+
+                       @Override
+                       public void markEvent(long n) {
+                       }
+
+                       @Override
+                       public double getRate() {
+                               return 5;
+                       }
+
+                       @Override
+                       public long getCount() {
+                               return 10;
+                       }
+               };
+
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               final TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
+
+               MetricQueryService.notifyOfAddedMetric(serviceActor, c, 
"counter", tm);
+               MetricQueryService.notifyOfAddedMetric(serviceActor, g, 
"gauge", tm);
+               MetricQueryService.notifyOfAddedMetric(serviceActor, h, 
"histogram", tm);
+               MetricQueryService.notifyOfAddedMetric(serviceActor, m, 
"meter", tm);
+
+               // these metrics will be removed *after* the first query
+               MetricQueryService.notifyOfRemovedMetric(serviceActor, c);
+               MetricQueryService.notifyOfRemovedMetric(serviceActor, g);
+               MetricQueryService.notifyOfRemovedMetric(serviceActor, h);
+               MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
+
+               serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
+               synchronized (testActor.lock) {
+                       if (testActor.message == null) {
+                               testActor.lock.wait();
+                       }
+               }
+
+               byte[] dump = (byte[]) testActor.message;
+               testActor.message = null;
+               assertTrue(dump.length > 0);
+
+               serviceActor.tell(MetricQueryService.getCreateDump(), 
testActorRef);
+               synchronized (testActor.lock) {
+                       if (testActor.message == null) {
+                               testActor.lock.wait();
+                       }
+               }
+
+               byte[] emptyDump = (byte[]) testActor.message;
+               testActor.message = null;
+               assertEquals(16, emptyDump.length);
+               for (int x = 0; x < 16; x++) {
+                       assertEquals(0, emptyDump[x]);
+               }
+
+               s.shutdown();
+       }
+
+       private static class TestActor extends UntypedActor {
+               public Object message;
+               public Object lock = new Object();
+
+               @Override
+               public void onReceive(Object message) throws Exception {
+                       synchronized (lock) {
+                               this.message = message;
+                               lock.notifyAll();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
new file mode 100644
index 0000000..597e376
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/QueryScopeInfoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+import static org.junit.Assert.assertEquals;
+
+public class QueryScopeInfoTest {
+       @Test
+       public void testJobManagerMetricInfo() {
+               QueryScopeInfo.JobManagerQueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc");
+               assertEquals("abc", info.scope);
+               assertEquals(INFO_CATEGORY_JM, info.getCategory());
+       }
+
+       @Test
+       public void testTaskManagerMetricInfo() {
+               QueryScopeInfo.TaskManagerQueryScopeInfo info = new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
+               assertEquals("abc", info.scope);
+               assertEquals("tmid", info.taskManagerID);
+               assertEquals(INFO_CATEGORY_TM, info.getCategory());
+       }
+
+       @Test
+       public void testJobMetricInfo() {
+               QueryScopeInfo.JobQueryScopeInfo info = new 
QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+               assertEquals("abc", info.scope);
+               assertEquals("jobid", info.jobID);
+               assertEquals(INFO_CATEGORY_JOB, info.getCategory());
+       }
+
+       @Test
+       public void testTaskMetricInfo() {
+               QueryScopeInfo.TaskQueryScopeInfo info = new 
QueryScopeInfo.TaskQueryScopeInfo("jid", "vid", 2, "abc");
+               assertEquals("abc", info.scope);
+               assertEquals("jid", info.jobID);
+               assertEquals("vid", info.vertexID);
+               assertEquals(2, info.subtaskIndex);
+               assertEquals(INFO_CATEGORY_TASK, info.getCategory());
+       }
+
+       @Test
+       public void testOperatorMetricInfo() {
+               QueryScopeInfo.OperatorQueryScopeInfo info = new 
QueryScopeInfo.OperatorQueryScopeInfo("jid", "vid", 2, "opname", "abc");
+               assertEquals("abc", info.scope);
+               assertEquals("jid", info.jobID);
+               assertEquals("vid", info.vertexID);
+               assertEquals("opname", info.operatorName);
+               assertEquals(2, info.subtaskIndex);
+               assertEquals(INFO_CATEGORY_OPERATOR, info.getCategory());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index d9b1ebe..78aac64 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -18,7 +18,9 @@
 package org.apache.flink.runtime.metrics.groups;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.junit.Test;
 
 import static org.junit.Assert.assertTrue;
@@ -33,6 +35,10 @@ public class AbstractMetricGroupTest {
                MetricRegistry registry = new MetricRegistry(new 
Configuration());
 
                AbstractMetricGroup group = new 
AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
+                       @Override
+                       protected QueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+                               return null;
+                       }
                };
                assertTrue(group.getAllVariables().isEmpty());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index faf42ea..1b9b24f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,13 +22,16 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class JobManagerGroupTest {
+public class JobManagerGroupTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
        //  adding and removing jobs
@@ -116,4 +119,13 @@ public class JobManagerGroupTest {
 
                registry.shutdown();
        }
+
+       @Test
+       public void testCreateQueryServiceMetricInfo() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host");
+
+               QueryScopeInfo.JobManagerQueryScopeInfo info = 
jm.createQueryServiceMetricInfo(new DummyCharacterFilter());
+               assertEquals("", info.scope);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 45f37ac..c3443f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -22,12 +22,15 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-public class JobManagerJobGroupTest {
+public class JobManagerJobGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
@@ -92,4 +95,16 @@ public class JobManagerJobGroupTest {
 
                registry.shutdown();
        }
+
+       @Test
+       public void testCreateQueryServiceMetricInfo() {
+               JobID jid = new JobID();
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, 
"host");
+               JobManagerJobMetricGroup jmj = new 
JobManagerJobMetricGroup(registry, jm, jid, "jobname");
+
+               QueryScopeInfo.JobQueryScopeInfo info = 
jmj.createQueryServiceMetricInfo(new DummyCharacterFilter());
+               assertEquals("", info.scope);
+               assertEquals(jid.toString(), info.jobID);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index a27206d..3fe8d75 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -18,19 +18,25 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
 
-public class MetricGroupTest {
+public class MetricGroupTest extends TestLogger {
        
        private MetricRegistry registry;
 
@@ -110,6 +116,24 @@ public class MetricGroupTest {
                assertNotNull(group.addGroup(name));
                assertNotNull(group.counter(name));
        }
+
+       @Test
+       public void testCreateQueryServiceMetricInfo() {
+               JobID jid = new JobID();
+               AbstractID vid = new AbstractID();
+               AbstractID eid = new AbstractID();
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
+               TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+               TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
+               GenericMetricGroup userGroup = new GenericMetricGroup(registry, 
task, "hello");
+
+               QueryScopeInfo.TaskQueryScopeInfo info = 
(QueryScopeInfo.TaskQueryScopeInfo) userGroup.createQueryServiceMetricInfo(new 
DummyCharacterFilter());
+               assertEquals("hello", info.scope);
+               assertEquals(jid.toString(), info.jobID);
+               assertEquals(vid.toString(), info.vertexID);
+               assertEquals(4, info.subtaskIndex);
+       }
        
        // 
------------------------------------------------------------------------
        
@@ -139,6 +163,11 @@ public class MetricGroupTest {
                }
 
                @Override
+               protected QueryScopeInfo 
createQueryServiceMetricInfo(CharacterFilter filter) {
+                       return null;
+               }
+
+               @Override
                protected void addMetric(String name, Metric metric) {}
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index c193ac8..2357936 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -21,9 +21,12 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.Map;
@@ -31,9 +34,8 @@ import java.util.Map;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 
-public class OperatorGroupTest {
+public class OperatorGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
@@ -91,4 +93,23 @@ public class OperatorGroupTest {
                assertNotNull(actualValue);
                assertEquals(expectedValue, actualValue);
        }
+
+       @Test
+       public void testCreateQueryServiceMetricInfo() {
+               JobID jid = new JobID();
+               AbstractID vid = new AbstractID();
+               AbstractID eid = new AbstractID();
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
+               TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+               TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
+               OperatorMetricGroup operator = new 
OperatorMetricGroup(registry, task, "operator");
+
+               QueryScopeInfo.OperatorQueryScopeInfo info = 
operator.createQueryServiceMetricInfo(new DummyCharacterFilter());
+               assertEquals("", info.scope);
+               assertEquals(jid.toString(), info.jobID);
+               assertEquals(vid.toString(), info.vertexID);
+               assertEquals(4, info.subtaskIndex);
+               assertEquals("operator", info.operatorName);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index b2c5dc7..a68e59d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -29,9 +29,12 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
 
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -40,7 +43,7 @@ import java.util.ArrayList;
 
 import static org.junit.Assert.*;
 
-public class TaskManagerGroupTest {
+public class TaskManagerGroupTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
        //  adding and removing jobs
@@ -269,4 +272,14 @@ public class TaskManagerGroupTest {
                assertEquals("constant.host.foo.host.name", 
group.getMetricIdentifier("name"));
                registry.shutdown();
        }
+
+       @Test
+       public void testCreateQueryServiceMetricInfo() {
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
+
+               QueryScopeInfo.TaskManagerQueryScopeInfo info = 
tm.createQueryServiceMetricInfo(new DummyCharacterFilter());
+               assertEquals("", info.scope);
+               assertEquals("id", info.taskManagerID);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index c96af45..175ded1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -23,12 +23,15 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
-public class TaskManagerJobGroupTest {
+public class TaskManagerJobGroupTest extends TestLogger {
 
        @Test
        public void testGenerateScopeDefault() {
@@ -90,4 +93,16 @@ public class TaskManagerJobGroupTest {
                                jmGroup.getMetricIdentifier("name"));
                registry.shutdown();
        }
+
+       @Test
+       public void testCreateQueryServiceMetricInfo() {
+               JobID jid = new JobID();
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
+               TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+
+               QueryScopeInfo.JobQueryScopeInfo info = 
job.createQueryServiceMetricInfo(new DummyCharacterFilter());
+               assertEquals("", info.scope);
+               assertEquals(jid.toString(), info.jobID);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index da07f8f..c65c1da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -24,15 +24,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
 import org.apache.flink.util.AbstractID;
-
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class TaskMetricGroupTest {
+public class TaskMetricGroupTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
        //  scope tests
@@ -110,6 +112,23 @@ public class TaskMetricGroupTest {
        }
 
        @Test
+       public void testCreateQueryServiceMetricInfo() {
+               JobID jid = new JobID();
+               AbstractID vid = new AbstractID();
+               AbstractID eid = new AbstractID();
+               MetricRegistry registry = new MetricRegistry(new 
Configuration());
+               TaskManagerMetricGroup tm = new 
TaskManagerMetricGroup(registry, "host", "id");
+               TaskManagerJobMetricGroup job = new 
TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
+               TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, 
eid, "taskName", 4, 5);
+
+               QueryScopeInfo.TaskQueryScopeInfo info = 
task.createQueryServiceMetricInfo(new DummyCharacterFilter());
+               assertEquals("", info.scope);
+               assertEquals(jid.toString(), info.jobID);
+               assertEquals(vid.toString(), info.vertexID);
+               assertEquals(4, info.subtaskIndex);
+       }
+
+       @Test
        public void testTaskMetricGroupCleanup() {
                CountingMetricRegistry registry = new 
CountingMetricRegistry(new Configuration());
                TaskManagerMetricGroup taskManagerMetricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "0");

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
new file mode 100644
index 0000000..601f734
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/DummyCharacterFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.CharacterFilter;
+
+public class DummyCharacterFilter implements CharacterFilter {
+       @Override
+       public String filterCharacters(String input) {
+               return input;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
new file mode 100644
index 0000000..82f8504
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/TestingHistogram.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+
+public class TestingHistogram implements Histogram {
+
+       @Override
+       public void update(long value) {
+       }
+
+       @Override
+       public long getCount() {
+               return 1;
+       }
+
+       @Override
+       public HistogramStatistics getStatistics() {
+               return new HistogramStatistics() {
+                       @Override
+                       public double getQuantile(double quantile) {
+                               return quantile;
+                       }
+
+                       @Override
+                       public long[] getValues() {
+                               return new long[0];
+                       }
+
+                       @Override
+                       public int size() {
+                               return 3;
+                       }
+
+                       @Override
+                       public double getMean() {
+                               return 4;
+                       }
+
+                       @Override
+                       public double getStdDev() {
+                               return 5;
+                       }
+
+                       @Override
+                       public long getMax() {
+                               return 6;
+                       }
+
+                       @Override
+                       public long getMin() {
+                               return 7;
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index bda4174..bc83db9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 
@@ -138,7 +139,8 @@ public class TaskManagerComponentsStartupShutdownTest {
                                ioManager,
                                network,
                                numberOfSlots,
-                               leaderRetrievalService);
+                               leaderRetrievalService,
+                               new MetricRegistry(config));
 
                        final ActorRef taskManager = 
actorSystem.actorOf(tmProps);
 

Reply via email to