[FLINK-4389] Expose metrics to WebFrontend

This closes #2363


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70704de0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70704de0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70704de0

Branch: refs/heads/master
Commit: 70704de0c82cbb7b143dd696221e11999feb3600
Parents: 545b72b
Author: zentol <ches...@apache.org>
Authored: Fri Aug 5 13:54:37 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Sep 15 19:17:52 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/MesosTaskManager.scala     |   7 +-
 .../flink/metrics/jmx/JMXReporterTest.java      |  56 +---
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  14 +
 .../metrics/AbstractMetricsHandler.java         | 124 ++++++++
 .../metrics/JobManagerMetricsHandler.java       |  47 +++
 .../webmonitor/metrics/JobMetricsHandler.java   |  49 +++
 .../metrics/JobVertexMetricsHandler.java        |  54 ++++
 .../webmonitor/metrics/MetricFetcher.java       | 224 ++++++++++++++
 .../runtime/webmonitor/metrics/MetricStore.java | 190 ++++++++++++
 .../metrics/TaskManagerMetricsHandler.java      |  49 +++
 .../metrics/AbstractMetricsHandlerTest.java     | 154 ++++++++++
 .../metrics/JobManagerMetricsHandlerTest.java   |  61 ++++
 .../metrics/JobMetricsHandlerTest.java          |  63 ++++
 .../metrics/JobVertexMetricsHandlerTest.java    |  67 ++++
 .../webmonitor/metrics/MetricFetcherTest.java   | 216 +++++++++++++
 .../webmonitor/metrics/MetricStoreTest.java     |  83 +++++
 .../metrics/TaskManagerMetricsHandlerTest.java  |  63 ++++
 .../flink/runtime/metrics/MetricRegistry.java   |  24 ++
 .../flink/runtime/metrics/dump/MetricDump.java  | 138 +++++++++
 .../metrics/dump/MetricDumpSerialization.java   | 302 +++++++++++++++++++
 .../metrics/dump/MetricQueryService.java        | 217 +++++++++++++
 .../runtime/metrics/dump/QueryScopeInfo.java    | 189 ++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     |  25 ++
 .../metrics/groups/GenericMetricGroup.java      |  10 +
 .../metrics/groups/JobManagerMetricGroup.java   |   7 +
 .../runtime/metrics/groups/JobMetricGroup.java  |   7 +
 .../metrics/groups/OperatorMetricGroup.java     |  11 +
 .../metrics/groups/TaskManagerMetricGroup.java  |   7 +
 .../runtime/metrics/groups/TaskMetricGroup.java |  14 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +
 .../minicluster/LocalFlinkMiniCluster.scala     |  14 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  43 +--
 .../metrics/dump/MetricDumpSerializerTest.java  | 178 +++++++++++
 .../runtime/metrics/dump/MetricDumpTest.java    |  86 ++++++
 .../metrics/dump/MetricQueryServiceTest.java    | 133 ++++++++
 .../metrics/dump/QueryScopeInfoTest.java        |  73 +++++
 .../metrics/groups/AbstractMetricGroupTest.java |   6 +
 .../metrics/groups/JobManagerGroupTest.java     |  14 +-
 .../metrics/groups/JobManagerJobGroupTest.java  |  17 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  31 +-
 .../metrics/groups/OperatorGroupTest.java       |  25 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  15 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |  17 +-
 .../metrics/groups/TaskMetricGroupTest.java     |  23 +-
 .../metrics/util/DummyCharacterFilter.java      |  27 ++
 .../runtime/metrics/util/TestingHistogram.java  |  73 +++++
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../testingUtils/TestingTaskManager.scala       |  13 +-
 .../flink/yarn/TestingYarnTaskManager.scala     |   9 +-
 .../org/apache/flink/yarn/YarnTaskManager.scala |   7 +-
 50 files changed, 3186 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
index 19b0c62..3972a57 100644
--- 
a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
+++ 
b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService
 import org.apache.flink.runtime.memory.MemoryManager
+import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.taskmanager.{TaskManager, 
TaskManagerConfiguration, TaskManagerLocation}
 
 /** An extension of the TaskManager that listens for additional Mesos-related
@@ -36,7 +37,8 @@ class MesosTaskManager(
     ioManager: IOManager,
     network: NetworkEnvironment,
     numberOfSlots: Int,
-    leaderRetrievalService: LeaderRetrievalService)
+    leaderRetrievalService: LeaderRetrievalService,
+    metricRegistry : MetricRegistry)
   extends TaskManager(
     config,
     resourceID,
@@ -45,7 +47,8 @@ class MesosTaskManager(
     ioManager,
     network,
     numberOfSlots,
-    leaderRetrievalService) {
+    leaderRetrievalService,
+    metricRegistry) {
 
   override def handleMessage: Receive = {
     super.handleMessage

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 913999b..089efe3 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -21,13 +21,12 @@ package org.apache.flink.metrics.jmx;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -286,57 +285,4 @@ public class JMXReporterTest extends TestLogger {
                        }
                }
        }
-
-       static class TestingHistogram implements Histogram {
-
-               @Override
-               public void update(long value) {
-
-               }
-
-               @Override
-               public long getCount() {
-                       return 1;
-               }
-
-               @Override
-               public HistogramStatistics getStatistics() {
-                       return new HistogramStatistics() {
-                               @Override
-                               public double getQuantile(double quantile) {
-                                       return quantile;
-                               }
-
-                               @Override
-                               public long[] getValues() {
-                                       return new long[0];
-                               }
-
-                               @Override
-                               public int size() {
-                                       return 3;
-                               }
-
-                               @Override
-                               public double getMean() {
-                                       return 4;
-                               }
-
-                               @Override
-                               public double getStdDev() {
-                                       return 5;
-                               }
-
-                               @Override
-                               public long getMax() {
-                                       return 6;
-                               }
-
-                               @Override
-                               public long getMin() {
-                                       return 7;
-                               }
-                       };
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 2bfbb85..4dd36e7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -67,6 +67,11 @@ import 
org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandl
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
+import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.ExecutionContext$;
@@ -133,6 +138,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private ExecutorService executorService;
 
+       private MetricFetcher metricFetcher;
+
        public WebRuntimeMonitor(
                        Configuration config,
                        LeaderRetrievalService leaderRetrievalService,
@@ -206,6 +213,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
 
+               metricFetcher = new MetricFetcher(actorSystem, retriever, 
context);
+
                router = new Router()
                        // config how to interact with this web server
                        .GET("/config", handler(new 
DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -235,6 +244,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                                                        currentGraphs,
                                                        
backPressureStatsTracker,
                                                        refreshInterval)))
+                       .GET("/jobs/:jobid/vertices/:vertexid/metrics", 
handler(new JobVertexMetricsHandler(metricFetcher)))
                        
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new 
SubtasksAllAccumulatorsHandler(currentGraphs)))
                        
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new 
SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
                        
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", 
handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
@@ -245,6 +255,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/jobs/:jobid/exceptions", handler(new 
JobExceptionsHandler(currentGraphs)))
                        .GET("/jobs/:jobid/accumulators", handler(new 
JobAccumulatorsHandler(currentGraphs)))
                        .GET("/jobs/:jobid/checkpoints", handler(new 
JobCheckpointsHandler(currentGraphs)))
+                       .GET("/jobs/:jobid/metrics", handler(new 
JobMetricsHandler(metricFetcher)))
 
                        .GET("/taskmanagers", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
@@ -252,6 +263,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                                new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, 
config))
                        .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
                                new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout, 
TaskManagerLogHandler.FileMode.STDOUT, config))
+                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new 
TaskManagerMetricsHandler(metricFetcher)))
 
                        // log and stdout
                        .GET("/jobmanager/log", logFiles.logFile == null ? new 
ConstantTextHandler("(log file unavailable)") :
@@ -260,6 +272,8 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/jobmanager/stdout", logFiles.stdOutFile == null 
? new ConstantTextHandler("(stdout file unavailable)") :
                                new StaticFileServerHandler(retriever, 
jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
 
+                       .GET("/jobmanager/metrics", handler(new 
JobManagerMetricsHandler(metricFetcher)))
+
                        // Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
                        .GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
 

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
new file mode 100644
index 0000000..54e4b6f
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+
+/**
+ * Abstract request handler that returns a list of all available metrics or 
the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code [ { "id" : "X" } ] }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list 
of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public abstract class AbstractMetricsHandler implements RequestHandler {
+       private final MetricFetcher fetcher;
+
+       public AbstractMetricsHandler(MetricFetcher fetcher) {
+               this.fetcher = Preconditions.checkNotNull(fetcher);
+       }
+
+       @Override
+       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+               fetcher.update();
+               String requestedMetricsList = queryParams.get("get");
+               return requestedMetricsList != null
+                       ? getMetricsValues(pathParams, requestedMetricsList)
+                       : getAvailableMetricsList(pathParams);
+       }
+
+       /**
+        * Returns a Map containing the metrics belonging to the entity pointed 
to by the path parameters.
+        *
+        * @param pathParams REST path parameters
+        * @param metrics MetricStore containing all metrics
+        * @return Map containing metrics, or null if no metric exists
+        */
+       protected abstract Map<String, Object> getMapFor(Map<String, String> 
pathParams, MetricStore metrics);
+
+       private String getMetricsValues(Map<String, String> pathParams, String 
requestedMetricsList) throws IOException {
+               if (requestedMetricsList.isEmpty()) {
+                       /**
+                        * The WebInterface doesn't check whether the list of 
available metrics was empty. This can lead to a 
+                        * request for which the "get" parameter is an empty 
string.
+                        */
+                       return "";
+               }
+               MetricStore metricStore = fetcher.getMetricStore();
+               synchronized (metricStore) {
+                       Map<String, Object> metrics = getMapFor(pathParams, 
metricStore);
+                       if (metrics == null) {
+                               return "";
+                       }
+                       String[] requestedMetrics = 
requestedMetricsList.split(",");
+
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+
+                       gen.writeStartArray();
+                       for (String requestedMetric : requestedMetrics) {
+                               Object metricValue = 
metrics.get(requestedMetric);
+                               if (metricValue != null) {
+                                       gen.writeStartObject();
+                                       gen.writeStringField("id", 
requestedMetric);
+                                       gen.writeStringField("value", 
metricValue.toString());
+                                       gen.writeEndObject();
+                               }
+                       }
+                       gen.writeEndArray();
+
+                       gen.close();
+                       return writer.toString();
+               }
+       }
+
+       private String getAvailableMetricsList(Map<String, String> pathParams) 
throws IOException {
+               MetricStore metricStore = fetcher.getMetricStore();
+               synchronized (metricStore) {
+                       Map<String, Object> metrics = getMapFor(pathParams, 
metricStore);
+                       if (metrics == null) {
+                               return "";
+                       }
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+
+                       gen.writeStartArray();
+                       for (String m : metrics.keySet()) {
+                               gen.writeStartObject();
+                               gen.writeStringField("id", m);
+                               gen.writeEndObject();
+                       }
+                       gen.writeEndArray();
+
+                       gen.close();
+                       return writer.toString();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
new file mode 100644
index 0000000..7435643
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for the job manager a list of all available 
metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list 
of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobManagerMetricsHandler extends AbstractMetricsHandler {
+       public JobManagerMetricsHandler(MetricFetcher fetcher) {
+               super(fetcher);
+       }
+
+       @Override
+       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.JobManagerMetricStore jobManager = 
metrics.jobManager;
+               if (jobManager == null) {
+                       return null;
+               } else {
+                       return jobManager.metrics;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
new file mode 100644
index 0000000..b54799d
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list 
of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobMetricsHandler extends AbstractMetricsHandler {
+       public static final String PARAMETER_JOB_ID = "jobid";
+
+       public JobMetricsHandler(MetricFetcher fetcher) {
+               super(fetcher);
+       }
+
+       @Override
+       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.JobMetricStore job = 
metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID));
+               if (job == null) {
+                       return null;
+               } else {
+                       return job.metrics;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
new file mode 100644
index 0000000..73b8bb0
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for a given task a list of all available 
metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list 
of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+       public static final String PARAMETER_VERTEX_ID = "vertexid";
+
+       public JobVertexMetricsHandler(MetricFetcher fetcher) {
+               super(fetcher);
+       }
+
+       @Override
+       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.JobMetricStore job = 
metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID));
+               if (job == null) {
+                       return null;
+               } else {
+                       MetricStore.TaskMetricStore task = 
job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID));
+                       if (task == null) {
+                               return null;
+                       } else {
+                               return task.metrics;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
new file mode 100644
index 0000000..7a39a53
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
+
+/**
+ * The MetricFetcher can be used to fetch metrics from the JobManager and all 
registered TaskManagers.
+ *
+ * Metrics will only be fetched when {@link MetricFetcher#update()} is called, 
provided that a sufficient time since
+ * the last call has passed.
+ */
+public class MetricFetcher {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
+
+       private final ActorSystem actorSystem;
+       private final JobManagerRetriever retriever;
+       private final ExecutionContext ctx;
+       private final FiniteDuration timeout = new 
FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(),
 TimeUnit.MILLISECONDS);
+
+       private MetricStore metrics = new MetricStore();
+       private MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();
+
+       private long lastUpdateTime;
+
+       public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever 
retriever, ExecutionContext ctx) {
+               this.actorSystem = Preconditions.checkNotNull(actorSystem);
+               this.retriever = Preconditions.checkNotNull(retriever);
+               this.ctx = Preconditions.checkNotNull(ctx);
+       }
+
+       /**
+        * Returns the MetricStore containing all stored metrics.
+        *
+        * @return MetricStore containing all stored metrics;
+        */
+       public MetricStore getMetricStore() {
+               return metrics;
+       }
+
+       /**
+        * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
+        */
+       public void update() {
+               synchronized (this) {
+                       long currentTime = System.currentTimeMillis();
+                       if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
+                               lastUpdateTime = currentTime;
+                               fetchMetrics();
+                       }
+               }
+       }
+
+       private void fetchMetrics() {
+               try {
+                       Option<scala.Tuple2<ActorGateway, Integer>> 
jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort();
+                       if (jobManagerGatewayAndWebPort.isDefined()) {
+                               ActorGateway jobManager = 
jobManagerGatewayAndWebPort.get()._1();
+
+                               /**
+                                * Remove all metrics that belong to a job that 
is not running and no longer archived.
+                                */
+                               Future<Object> jobDetailsFuture = 
jobManager.ask(new RequestJobDetails(true, true), timeout);
+                               jobDetailsFuture
+                                       .onSuccess(new OnSuccess<Object>() {
+                                               @Override
+                                               public void onSuccess(Object 
result) throws Throwable {
+                                                       MultipleJobsDetails 
details = (MultipleJobsDetails) result;
+                                                       ArrayList<String> 
toRetain = new ArrayList<>();
+                                                       for (JobDetails job : 
details.getRunningJobs()) {
+                                                               
toRetain.add(job.getJobId().toString());
+                                                       }
+                                                       for (JobDetails job : 
details.getFinishedJobs()) {
+                                                               
toRetain.add(job.getJobId().toString());
+                                                       }
+                                                       synchronized (metrics) {
+                                                               
metrics.jobs.keySet().retainAll(toRetain);
+                                                       }
+                                               }
+                                       }, ctx);
+                               logErrorOnFailure(jobDetailsFuture, "Fetching 
of JobDetails failed.");
+
+                               String jobManagerPath = jobManager.path();
+                               String queryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
"MetricQueryService";
+                               ActorRef jobManagerQueryService = 
actorSystem.actorFor(queryServicePath);
+
+                               queryMetrics(jobManagerQueryService);
+
+                               /**
+                                * We first request the list of all registered 
task managers from the job manager, and then
+                                * request the respective metric dump from each 
task manager.
+                                *
+                                * All stored metrics that do not belong to a 
registered task manager will be removed.
+                                */
+                               Future<Object> registeredTaskManagersFuture = 
jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout);
+                               registeredTaskManagersFuture
+                                       .onSuccess(new OnSuccess<Object>() {
+                                               @Override
+                                               public void onSuccess(Object 
result) throws Throwable {
+                                                       Iterable<Instance> 
taskManagers = ((JobManagerMessages.RegisteredTaskManagers) 
result).asJavaIterable();
+                                                       List<String> 
activeTaskManagers = new ArrayList<>();
+                                                       for (Instance 
taskManager : taskManagers) {
+                                                               
activeTaskManagers.add(taskManager.getId().toString());
+
+                                                               String 
taskManagerPath = taskManager.getActorGateway().path();
+                                                               String 
queryServicePath = taskManagerPath.substring(0, 
taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService";
+                                                               ActorRef 
taskManagerQueryService = actorSystem.actorFor(queryServicePath);
+
+                                                               
queryMetrics(taskManagerQueryService);
+                                                       }
+                                                       synchronized (metrics) 
{ // remove all metrics belonging to unregistered task managers
+                                                               
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
+                                                       }
+                                               }
+                                       }, ctx);
+                               logErrorOnFailure(registeredTaskManagersFuture, 
"Fetchin list of registered TaskManagers failed.");
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Exception while fetching metrics.", e);
+               }
+       }
+
+       private void logErrorOnFailure(Future<Object> future, final String 
message) {
+               future.onFailure(new OnFailure() {
+                       @Override
+                       public void onFailure(Throwable failure) throws 
Throwable {
+                               LOG.warn(message, failure);
+                       }
+               }, ctx);
+       }
+
+       /**
+        * Requests a metric dump from the given actor.
+        *
+        * @param actor ActorRef to request the dump from
+     */
+       private void queryMetrics(ActorRef actor) {
+               Future<Object> metricQueryFuture = new 
BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout);
+               metricQueryFuture
+                       .onSuccess(new OnSuccess<Object>() {
+                               @Override
+                               public void onSuccess(Object result) throws 
Throwable {
+                                       addMetrics(result);
+                               }
+                       }, ctx);
+               logErrorOnFailure(metricQueryFuture, "Fetching metrics 
failed.");
+       }
+
+       private void addMetrics(Object result) throws IOException {
+               byte[] data = (byte[]) result;
+               List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
+               for (MetricDump metric : dumpedMetrics) {
+                       metrics.add(metric);
+               }
+       }
+
+       /**
+        * Helper class that allows mocking of the answer.
+     */
+       static class BasicGateway {
+               private final ActorRef actor;
+
+               private BasicGateway(ActorRef actor) {
+                       this.actor = actor;
+               }
+
+               /**
+                * Sends a message asynchronously and returns its response. The 
response to the message is
+                * returned as a future.
+                *
+                * @param message Message to be sent
+                * @param timeout Timeout until the Future is completed with an 
AskTimeoutException
+                * @return Future which contains the response to the sent 
message
+                */
+               public Future<Object> ask(Object message, FiniteDuration 
timeout) {
+                       return Patterns.ask(actor, message, new 
Timeout(timeout));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
new file mode 100644
index 0000000..41e68cc
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
+import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
+import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
+
+/**
+ * Nested data-structure to store metrics.
+ *
+ * This structure is not thread-safe.
+ */
+public class MetricStore {
+       private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
+
+       final JobManagerMetricStore jobManager = new JobManagerMetricStore();
+       final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
+       final Map<String, JobMetricStore> jobs = new HashMap<>();
+
+       public void add(MetricDump metric) {
+               try {
+                       QueryScopeInfo info = metric.scopeInfo;
+                       TaskManagerMetricStore tm;
+                       JobMetricStore job;
+                       TaskMetricStore task;
+
+                       String name = info.scope.isEmpty()
+                               ? metric.name
+                               : info.scope + "." + metric.name;
+                       
+                       if (name.isEmpty()) { // malformed transmission
+                               return;
+                       }
+
+                       switch (info.getCategory()) {
+                               case INFO_CATEGORY_JM:
+                                       addMetric(jobManager.metrics, name, 
metric);
+                               case INFO_CATEGORY_TM:
+                                       String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
+                                       tm = taskManagers.get(tmID);
+                                       if (tm == null) {
+                                               tm = new 
TaskManagerMetricStore();
+                                               taskManagers.put(tmID, tm);
+                                       }
+                                       addMetric(tm.metrics, name, metric);
+                                       break;
+                               case INFO_CATEGORY_JOB:
+                                       QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
+                                       job = jobs.get(jobInfo.jobID);
+                                       if (job == null) {
+                                               job = new JobMetricStore();
+                                               jobs.put(jobInfo.jobID, job);
+                                       }
+                                       addMetric(job.metrics, name, metric);
+                                       break;
+                               case INFO_CATEGORY_TASK:
+                                       QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
+                                       job = jobs.get(taskInfo.jobID);
+                                       if (job == null) {
+                                               job = new JobMetricStore();
+                                               jobs.put(taskInfo.jobID, job);
+                                       }
+                                       task = job.tasks.get(taskInfo.vertexID);
+                                       if (task == null) {
+                                               task = new TaskMetricStore();
+                                               
job.tasks.put(taskInfo.vertexID, task);
+                                       }
+                                       /**
+                                        * As the WebInterface task metric 
queries currently do not account for subtasks we don't 
+                                        * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name. 
+                                        */
+                                       addMetric(task.metrics, 
taskInfo.subtaskIndex + "." + name, metric);
+                                       break;
+                               case INFO_CATEGORY_OPERATOR:
+                                       QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
+                                       job = jobs.get(operatorInfo.jobID);
+                                       if (job == null) {
+                                               job = new JobMetricStore();
+                                               jobs.put(operatorInfo.jobID, 
job);
+                                       }
+                                       task = 
job.tasks.get(operatorInfo.vertexID);
+                                       if (task == null) {
+                                               task = new TaskMetricStore();
+                                               
job.tasks.put(operatorInfo.vertexID, task);
+                                       }
+                                       /**
+                                        * As the WebInterface does not account 
for operators (because it can't) we don't 
+                                        * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name 
+                                        * as the name.
+                                        */
+                                       addMetric(task.metrics, 
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, 
metric);
+                                       break;
+                               default:
+                                       LOG.debug("Invalid metric dump 
category: " + info.getCategory());
+                       }
+               } catch (Exception e) {
+                       LOG.debug("Malformed metric dump.", e);
+               }
+       }
+
+       private void addMetric(Map<String, Object> target, String name, 
MetricDump metric) {
+               switch (metric.getCategory()) {
+                       case METRIC_CATEGORY_COUNTER:
+                               MetricDump.CounterDump counter = 
(MetricDump.CounterDump) metric;
+                               target.put(name, counter.count);
+                               break;
+                       case METRIC_CATEGORY_GAUGE:
+                               MetricDump.GaugeDump gauge = 
(MetricDump.GaugeDump) metric;
+                               target.put(name, gauge.value);
+                               break;
+                       case METRIC_CATEGORY_HISTOGRAM:
+                               MetricDump.HistogramDump histogram = 
(MetricDump.HistogramDump) metric;
+                               target.put(name + "_min", histogram.min);
+                               target.put(name + "_max", histogram.max);
+                               target.put(name + "_mean", histogram.mean);
+                               target.put(name + "_median", histogram.median);
+                               target.put(name + "_stddev", histogram.stddev);
+                               target.put(name + "_p75", histogram.p75);
+                               target.put(name + "_p90", histogram.p90);
+                               target.put(name + "_p95", histogram.p95);
+                               target.put(name + "_p98", histogram.p98);
+                               target.put(name + "_p99", histogram.p99);
+                               target.put(name + "_p999", histogram.p999);
+                               break;
+                       case METRIC_CATEGORY_METER:
+                               MetricDump.MeterDump meter = 
(MetricDump.MeterDump) metric;
+                               target.put(name, meter.rate);
+                               break;
+               }
+       }
+
+       /**
+        * Sub-structure containing metrics of the JobManager.
+        */
+       static class JobManagerMetricStore {
+               public final Map<String, Object> metrics = new HashMap<>();
+       }
+
+       /**
+        * Sub-structure containing metrics of a single TaskManager.
+        */
+       static class TaskManagerMetricStore {
+               public final Map<String, Object> metrics = new HashMap<>();
+       }
+
+       /**
+        * Sub-structure containing metrics of a single Job.
+        */
+       static class JobMetricStore {
+               public final Map<String, Object> metrics = new HashMap<>();
+               public final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
+       }
+
+       /**
+        * Sub-structure containing metrics of a single Task.
+        */
+       static class TaskMetricStore {
+               public final Map<String, Object> metrics = new HashMap<>();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
new file mode 100644
index 0000000..fea3d07
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import java.util.Map;
+
+/**
+ * Request handler that returns for a given task manager a list of all 
available metrics or the values for a set of metrics.
+ *
+ * If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
+ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
+ *
+ * If the query parameters do contain a "get" parameter a comma-separate list 
of metric names is expected as a value.
+ * {@code /get?X,Y}
+ * The handler will then return a list containing the values of the requested 
metrics.
+ * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
+ */
+public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
+       public static final String PARAMETER_TM_ID = "tmid";
+
+       public TaskManagerMetricsHandler(MetricFetcher fetcher) {
+               super(fetcher);
+       }
+
+       @Override
+       protected Map<String, Object> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
+               MetricStore.TaskManagerMetricStore taskManager = 
metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID));
+               if (taskManager == null) {
+                       return null;
+               } else {
+                       return taskManager.metrics;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
new file mode 100644
index 0000000..483dbf6
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class AbstractMetricsHandlerTest extends TestLogger {
+       /**
+        * Verifies that the handlers correctly handle expected REST calls
+        */
+       @Test
+       public void testHandleRequest() throws Exception {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               Map<String, String> queryParams = new HashMap<>();
+
+               pathParams.put("jobid", "jobid");
+               pathParams.put("vertexid", "taskid");
+
+               // get list of available metrics
+               String availableList = handler.handleRequest(pathParams, 
queryParams, null);
+
+               assertEquals("[" +
+                               "{\"id\":\"8.opname.abc.metric5\"}," +
+                               "{\"id\":\"8.abc.metric4\"}" +
+                               "]",
+                       availableList);
+
+               // get value for a single metric
+               queryParams.put("get", "8.opname.abc.metric5");
+
+               String metricValue = handler.handleRequest(pathParams, 
queryParams, null);
+
+               assertEquals("[" +
+                               
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
+                               "]"
+                       , metricValue
+               );
+
+               // get values for multiple metrics
+               queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
+
+               String metricValues = handler.handleRequest(pathParams, 
queryParams, null);
+
+               assertEquals("[" +
+                               
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
+                               "{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" +
+                               "]",
+                       metricValues
+               );
+       }
+
+       /**
+        * Verifies that a malformed request for available metrics does not 
throw an exception.
+        */
+       @Test
+       public void testInvalidListDoesNotFail() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               Map<String, String> queryParams = new HashMap<>();
+
+               pathParams.put("jobid", "jobid");
+               pathParams.put("vertexid", "taskid");
+
+               //-----invalid variable
+               pathParams.put("jobid", "nonexistent");
+
+               try {
+                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+               } catch (Exception e) {
+                       fail();
+               }
+       }
+
+       /**
+        * Verifies that a malformed request for a metric value does not throw 
an exception.
+        */
+       @Test
+       public void testInvalidGetDoesNotFail() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               Map<String, String> queryParams = new HashMap<>();
+
+               pathParams.put("jobid", "jobid");
+               pathParams.put("vertexid", "taskid");
+
+               //-----empty string
+               queryParams.put("get", "");
+
+               try {
+                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+               } catch (Exception e) {
+                       fail(e.getMessage());
+               }
+
+               //-----invalid variable
+               pathParams.put("jobid", "nonexistent");
+               queryParams.put("get", "subindex.opname.abc.metric5");
+
+               try {
+                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+               } catch (Exception e) {
+                       fail(e.getMessage());
+               }
+
+               //-----invalid metric
+               pathParams.put("jobid", "nonexistant");
+               queryParams.put("get", "subindex.opname.abc.nonexistant");
+
+               try {
+                       assertEquals("", handler.handleRequest(pathParams, 
queryParams, null));
+               } catch (Exception e) {
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..9757574
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class JobManagerMetricsHandlerTest extends TestLogger {
+       @Test
+       public void getMapFor() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertEquals(0L, metrics.get("abc.metric1"));
+       }
+
+       @Test
+       public void getMapForNull() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = fetcher.getMetricStore();
+
+               JobManagerMetricsHandler handler = new 
JobManagerMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertNotNull(metrics);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
new file mode 100644
index 0000000..c0cc345
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class JobMetricsHandlerTest extends TestLogger {
+       @Test
+       public void getMapFor() throws Exception {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               pathParams.put(PARAMETER_JOB_ID, "jobid");
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertEquals(2L, metrics.get("abc.metric3"));
+       }
+
+       @Test
+       public void getMapForNull() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = fetcher.getMetricStore();
+
+               JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertNull(metrics);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
new file mode 100644
index 0000000..d6e5ca7
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID;
+import static 
org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class JobVertexMetricsHandlerTest extends TestLogger {
+       @Test
+       public void getMapFor() throws Exception {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               pathParams.put(PARAMETER_JOB_ID, "jobid");
+               pathParams.put(PARAMETER_VERTEX_ID, "taskid");
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertEquals(3L, metrics.get("8.abc.metric4"));
+
+               assertEquals(4L, metrics.get("8.opname.abc.metric5"));
+       }
+
+       @Test
+       public void getMapForNull() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = fetcher.getMetricStore();
+
+               JobVertexMetricsHandler handler = new 
JobVertexMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertNull(metrics);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
new file mode 100644
index 0000000..e0cfe26
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.util.TestingHistogram;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future$;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(MetricFetcher.class)
+public class MetricFetcherTest extends TestLogger {
+       @Test
+       public void testUpdate() throws Exception {
+               // ========= setup TaskManager 
=================================================================================
+               JobID jobID = new JobID();
+               InstanceID tmID = new InstanceID();
+               ActorGateway taskManagerGateway = mock(ActorGateway.class);
+               when(taskManagerGateway.path()).thenReturn("/tm/address");
+
+               Instance taskManager = mock(Instance.class);
+               
when(taskManager.getActorGateway()).thenReturn(taskManagerGateway);
+               when(taskManager.getId()).thenReturn(tmID);
+
+               // ========= setup JobManager 
==================================================================================
+               JobDetails details = mock(JobDetails.class);
+               when(details.getJobId()).thenReturn(jobID);
+
+               ActorGateway jobManagerGateway = mock(ActorGateway.class);
+               Object registeredTaskManagersAnswer = new 
JobManagerMessages.RegisteredTaskManagers(
+                       
JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala());
+
+               when(jobManagerGateway.ask(isA(RequestJobDetails.class), 
any(FiniteDuration.class)))
+                       .thenReturn(Future$.MODULE$.successful((Object) new 
MultipleJobsDetails(new JobDetails[0], new JobDetails[0])));
+               
when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class),
 any(FiniteDuration.class)))
+                       
.thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer));
+               when(jobManagerGateway.path()).thenReturn("/jm/address");
+
+               JobManagerRetriever retriever = mock(JobManagerRetriever.class);
+               when(retriever.getJobManagerGatewayAndWebPort())
+                       .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, 
Integer>(jobManagerGateway, 0)));
+
+               // ========= setup QueryServices 
================================================================================
+               Object requestMetricsAnswer = createRequestDumpAnswer(tmID, 
jobID);
+
+               final ActorRef jmQueryService = mock(ActorRef.class);
+               final ActorRef tmQueryService = mock(ActorRef.class);
+
+               ActorSystem actorSystem = mock(ActorSystem.class);
+               when(actorSystem.actorFor(eq("/jm/" + 
METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService);
+               when(actorSystem.actorFor(eq("/tm/" + 
METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService);
+
+               MetricFetcher.BasicGateway jmQueryServiceGateway = 
mock(MetricFetcher.BasicGateway.class);
+               
when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()),
 any(FiniteDuration.class)))
+                       .thenReturn(Future$.MODULE$.successful((Object) new 
byte[16]));
+
+               MetricFetcher.BasicGateway tmQueryServiceGateway = 
mock(MetricFetcher.BasicGateway.class);
+               
when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()),
 any(FiniteDuration.class)))
+                       
.thenReturn(Future$.MODULE$.successful(requestMetricsAnswer));
+
+               whenNew(MetricFetcher.BasicGateway.class)
+                       .withArguments(eq(new Object() {
+                               @Override
+                               public boolean equals(Object o) {
+                                       return o == jmQueryService;
+                               }
+                       }))
+                       .thenReturn(jmQueryServiceGateway);
+               whenNew(MetricFetcher.BasicGateway.class)
+                       .withArguments(eq(new Object() {
+                               @Override
+                               public boolean equals(Object o) {
+                                       return o == tmQueryService;
+                               }
+                       }))
+                       .thenReturn(tmQueryServiceGateway);
+
+               // ========= start MetricFetcher testing 
=======================================================================
+               ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor());
+               MetricFetcher fetcher = new MetricFetcher(actorSystem, 
retriever, context);
+
+               // verify that update fetches metrics and updates the store
+               fetcher.update();
+               MetricStore store = fetcher.getMetricStore();
+               synchronized (store) {
+                       assertEquals(7L, 
store.jobManager.metrics.get("abc.hist_min"));
+                       assertEquals(6L, 
store.jobManager.metrics.get("abc.hist_max"));
+                       assertEquals(4.0, 
store.jobManager.metrics.get("abc.hist_mean"));
+                       assertEquals(0.5, 
store.jobManager.metrics.get("abc.hist_median"));
+                       assertEquals(5.0, 
store.jobManager.metrics.get("abc.hist_stddev"));
+                       assertEquals(0.75, 
store.jobManager.metrics.get("abc.hist_p75"));
+                       assertEquals(0.9, 
store.jobManager.metrics.get("abc.hist_p90"));
+                       assertEquals(0.95, 
store.jobManager.metrics.get("abc.hist_p95"));
+                       assertEquals(0.98, 
store.jobManager.metrics.get("abc.hist_p98"));
+                       assertEquals(0.99, 
store.jobManager.metrics.get("abc.hist_p99"));
+                       assertEquals(0.999, 
store.jobManager.metrics.get("abc.hist_p999"));
+
+                       assertEquals("x", 
store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge"));
+                       assertEquals(5.0, 
store.jobs.get(jobID.toString()).metrics.get("abc.jc"));
+                       assertEquals(2L, 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc"));
+                       assertEquals(1L, 
store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc"));
+               }
+       }
+
+       public class CurrentThreadExecutor implements Executor {
+               public void execute(Runnable r) {
+                       r.run();
+               }
+       }
+
+       private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID 
jobID) throws IOException {
+               Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new 
HashMap<>();
+               Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new 
HashMap<>();
+               Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new 
HashMap<>();
+               Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new 
HashMap<>();
+
+               SimpleCounter c1 = new SimpleCounter();
+               SimpleCounter c2 = new SimpleCounter();
+               
+               c1.inc(1);
+               c2.inc(2);
+
+               counters.put(c1, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", 
"abc"), "oc"));
+               counters.put(c2, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc"));
+               meters.put(new Meter() {
+                       @Override
+                       public void markEvent() {
+                       }
+
+                       @Override
+                       public void markEvent(long n) {
+                       }
+
+                       @Override
+                       public double getRate() {
+                               return 5;
+                       }
+
+                       @Override
+                       public long getCount() {
+                               return 10;
+                       }
+               }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc"));
+               gauges.put(new Gauge<String>() {
+                       @Override
+                       public String getValue() {
+                               return "x";
+                       }
+               }, new Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge"));
+               histograms.put(new TestingHistogram(), new 
Tuple2<QueryScopeInfo, String>(new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist"));
+
+               MetricDumpSerialization.MetricDumpSerializer serializer = new 
MetricDumpSerialization.MetricDumpSerializer();
+               byte[] dump = serializer.serialize(counters, gauges, 
histograms, meters);
+               serializer.close();
+
+               return dump;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
new file mode 100644
index 0000000..9dc2929
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class MetricStoreTest extends TestLogger {
+       @Test
+       public void testAdd() throws IOException {
+               MetricStore store = setupStore(new MetricStore());
+
+               assertEquals(0L, store.jobManager.metrics.get("abc.metric1"));
+               assertEquals(1L, 
store.taskManagers.get("tmid").metrics.get("abc.metric2"));
+               assertEquals(2L, 
store.jobs.get("jobid").metrics.get("abc.metric3"));
+               assertEquals(3L, 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4"));
+               assertEquals(4L, 
store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5"));
+       }
+
+       @Test
+       public void testMalformedNameHandling() {
+               MetricStore store = new MetricStore();
+               //-----verify that no exceptions are thrown
+               
+               // null
+               store.add(null);
+               // empty name
+               QueryScopeInfo.JobManagerQueryScopeInfo info = new 
QueryScopeInfo.JobManagerQueryScopeInfo("");
+               MetricDump.CounterDump cd = new MetricDump.CounterDump(info, 
"", 0);
+               store.add(cd);
+
+               //-----verify that no side effects occur
+               assertEquals(0, store.jobManager.metrics.size());
+               assertEquals(0, store.taskManagers.size());
+               assertEquals(0, store.jobs.size());
+       }
+
+       public static MetricStore setupStore(MetricStore store) {
+               QueryScopeInfo.JobManagerQueryScopeInfo jm = new 
QueryScopeInfo.JobManagerQueryScopeInfo("abc");
+               MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, 
"metric1", 0);
+
+               QueryScopeInfo.TaskManagerQueryScopeInfo tm = new 
QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc");
+               MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, 
"metric2", 1);
+
+               QueryScopeInfo.JobQueryScopeInfo job = new 
QueryScopeInfo.JobQueryScopeInfo("jobid", "abc");
+               MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, 
"metric3", 2);
+
+               QueryScopeInfo.TaskQueryScopeInfo task = new 
QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc");
+               MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, 
"metric4", 3);
+
+               QueryScopeInfo.OperatorQueryScopeInfo operator = new 
QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc");
+               MetricDump.CounterDump cd5 = new 
MetricDump.CounterDump(operator, "metric5", 4);
+
+               store.add(cd1);
+               store.add(cd2);
+               store.add(cd3);
+               store.add(cd4);
+               store.add(cd5);
+               
+               return store;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
new file mode 100644
index 0000000..6299a56
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.metrics;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler.PARAMETER_TM_ID;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+public class TaskManagerMetricsHandlerTest extends TestLogger {
+       @Test
+       public void getMapFor() throws Exception {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = 
MetricStoreTest.setupStore(fetcher.getMetricStore());
+
+               TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+               pathParams.put(PARAMETER_TM_ID, "tmid");
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertEquals(1L, metrics.get("abc.metric2"));
+       }
+
+       @Test
+       public void getMapForNull() {
+               MetricFetcher fetcher = new 
MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), 
mock(ExecutionContext.class));
+               MetricStore store = fetcher.getMetricStore();
+
+               TaskManagerMetricsHandler handler = new 
TaskManagerMetricsHandler(fetcher);
+
+               Map<String, String> pathParams = new HashMap<>();
+
+               Map<String, Object> metrics = handler.getMapFor(pathParams, 
store);
+
+               assertNull(metrics);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index 763ea66..9c96858 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.metrics;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
@@ -26,6 +28,8 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.metrics.dump.MetricQueryService;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormat;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.slf4j.Logger;
@@ -47,6 +51,7 @@ public class MetricRegistry {
        
        private List<MetricReporter> reporters;
        private ScheduledExecutorService executor;
+       private ActorRef queryService;
 
        private final ScopeFormats scopeFormats;
 
@@ -144,6 +149,19 @@ public class MetricRegistry {
                }
        }
 
+       /**
+        * Initializes the MetricQueryService.
+        * 
+        * @param actorSystem ActorSystem to create the MetricQueryService on
+     */
+       public void startQueryService(ActorSystem actorSystem) {
+               try {
+                       queryService = 
MetricQueryService.startMetricQueryService(actorSystem);
+               } catch (Exception e) {
+                       LOG.warn("Could not start MetricDumpActor. No metrics 
will be submitted to the WebInterface.", e);
+               }
+       }
+
        public char getDelimiter() {
                return this.delimiter;
        }
@@ -207,6 +225,9 @@ public class MetricRegistry {
                                        }
                                }
                        }
+                       if (queryService != null) {
+                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, 
(AbstractMetricGroup) group);
+                       }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
                }
@@ -228,6 +249,9 @@ public class MetricRegistry {
                                        }
                                }
                        }
+                       if (queryService != null) {
+                               
MetricQueryService.notifyOfRemovedMetric(queryService, metric);
+                       }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
new file mode 100644
index 0000000..2239b50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.metrics.dump;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A container for a dumped metric that contains the scope, name and value(s) 
of the metric.
+ */
+public abstract class MetricDump {
+       /** Categories to be returned by {@link MetricDump#getCategory()} to 
avoid instanceof checks. */
+       public static final byte METRIC_CATEGORY_COUNTER = 0;
+       public static final byte METRIC_CATEGORY_GAUGE = 1;
+       public static final byte METRIC_CATEGORY_HISTOGRAM = 2;
+       public static final byte METRIC_CATEGORY_METER = 3;
+
+       /** The scope information for the stored metric. */
+       public final QueryScopeInfo scopeInfo;
+       /** The name of the stored metric. */
+       public final String name;
+
+       private MetricDump(QueryScopeInfo scopeInfo, String name) {
+               this.scopeInfo = Preconditions.checkNotNull(scopeInfo);
+               this.name = Preconditions.checkNotNull(name);
+       }
+
+       /**
+        * Returns the category for this MetricDump.
+        *
+        * @return category
+        */
+       public abstract byte getCategory();
+
+       /**
+        * Container for the value of a {@link 
org.apache.flink.metrics.Counter}.
+        */
+       public static class CounterDump extends MetricDump {
+               public final long count;
+
+               public CounterDump(QueryScopeInfo scopeInfo, String name, long 
count) {
+                       super(scopeInfo, name);
+                       this.count = count;
+               }
+
+               @Override
+               public byte getCategory() {
+                       return METRIC_CATEGORY_COUNTER;
+               }
+       }
+
+       /**
+        * Container for the value of a {@link org.apache.flink.metrics.Gauge} 
as a string.
+        */
+       public static class GaugeDump extends MetricDump {
+               public final String value;
+
+               public GaugeDump(QueryScopeInfo scopeInfo, String name, String 
value) {
+                       super(scopeInfo, name);
+                       this.value = Preconditions.checkNotNull(value);
+               }
+
+               @Override
+               public byte getCategory() {
+                       return METRIC_CATEGORY_GAUGE;
+               }
+       }
+
+       /**
+        * Container for the values of a {@link 
org.apache.flink.metrics.Histogram}.
+        */
+       public static class HistogramDump extends MetricDump {
+               public long min;
+               public long max;
+               public double mean;
+               public double median;
+               public double stddev;
+               public double p75;
+               public double p90;
+               public double p95;
+               public double p98;
+               public double p99;
+               public double p999;
+
+               public HistogramDump(QueryScopeInfo scopeInfo, String name,
+                       long min, long max, double mean, double median, double 
stddev,
+                       double p75, double p90, double p95, double p98, double 
p99, double p999) {
+
+                       super(scopeInfo, name);
+                       this.min = min;
+                       this.max = max;
+                       this.mean = mean;
+                       this.median = median;
+                       this.stddev = stddev;
+                       this.p75 = p75;
+                       this.p90 = p90;
+                       this.p95 = p95;
+                       this.p98 = p98;
+                       this.p99 = p99;
+                       this.p999 = p999;
+               }
+
+               @Override
+               public byte getCategory() {
+                       return METRIC_CATEGORY_HISTOGRAM;
+               }
+       }
+
+       /**
+        * Container for the rate of a {@link org.apache.flink.metrics.Meter}. 
+        */
+       public static class MeterDump extends MetricDump {
+               public final double rate;
+
+               public MeterDump(QueryScopeInfo scopeInfo, String name, double 
rate) {
+                       super(scopeInfo, name);
+                       this.rate = rate;
+               }
+               @Override
+               public byte getCategory() {
+                       return METRIC_CATEGORY_METER;
+               }
+       }
+}

Reply via email to