[FLINK-4389] Expose metrics to WebFrontend This closes #2363
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70704de0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70704de0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70704de0 Branch: refs/heads/master Commit: 70704de0c82cbb7b143dd696221e11999feb3600 Parents: 545b72b Author: zentol <ches...@apache.org> Authored: Fri Aug 5 13:54:37 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Sep 15 19:17:52 2016 +0200 ---------------------------------------------------------------------- .../clusterframework/MesosTaskManager.scala | 7 +- .../flink/metrics/jmx/JMXReporterTest.java | 56 +--- .../runtime/webmonitor/WebRuntimeMonitor.java | 14 + .../metrics/AbstractMetricsHandler.java | 124 ++++++++ .../metrics/JobManagerMetricsHandler.java | 47 +++ .../webmonitor/metrics/JobMetricsHandler.java | 49 +++ .../metrics/JobVertexMetricsHandler.java | 54 ++++ .../webmonitor/metrics/MetricFetcher.java | 224 ++++++++++++++ .../runtime/webmonitor/metrics/MetricStore.java | 190 ++++++++++++ .../metrics/TaskManagerMetricsHandler.java | 49 +++ .../metrics/AbstractMetricsHandlerTest.java | 154 ++++++++++ .../metrics/JobManagerMetricsHandlerTest.java | 61 ++++ .../metrics/JobMetricsHandlerTest.java | 63 ++++ .../metrics/JobVertexMetricsHandlerTest.java | 67 ++++ .../webmonitor/metrics/MetricFetcherTest.java | 216 +++++++++++++ .../webmonitor/metrics/MetricStoreTest.java | 83 +++++ .../metrics/TaskManagerMetricsHandlerTest.java | 63 ++++ .../flink/runtime/metrics/MetricRegistry.java | 24 ++ .../flink/runtime/metrics/dump/MetricDump.java | 138 +++++++++ .../metrics/dump/MetricDumpSerialization.java | 302 +++++++++++++++++++ .../metrics/dump/MetricQueryService.java | 217 +++++++++++++ .../runtime/metrics/dump/QueryScopeInfo.java | 189 ++++++++++++ .../metrics/groups/AbstractMetricGroup.java | 25 ++ .../metrics/groups/GenericMetricGroup.java | 10 + .../metrics/groups/JobManagerMetricGroup.java | 7 + .../runtime/metrics/groups/JobMetricGroup.java | 7 + .../metrics/groups/OperatorMetricGroup.java | 11 + .../metrics/groups/TaskManagerMetricGroup.java | 7 + .../runtime/metrics/groups/TaskMetricGroup.java | 14 +- .../flink/runtime/jobmanager/JobManager.scala | 6 + .../minicluster/LocalFlinkMiniCluster.scala | 14 +- .../flink/runtime/taskmanager/TaskManager.scala | 43 +-- .../metrics/dump/MetricDumpSerializerTest.java | 178 +++++++++++ .../runtime/metrics/dump/MetricDumpTest.java | 86 ++++++ .../metrics/dump/MetricQueryServiceTest.java | 133 ++++++++ .../metrics/dump/QueryScopeInfoTest.java | 73 +++++ .../metrics/groups/AbstractMetricGroupTest.java | 6 + .../metrics/groups/JobManagerGroupTest.java | 14 +- .../metrics/groups/JobManagerJobGroupTest.java | 17 +- .../runtime/metrics/groups/MetricGroupTest.java | 31 +- .../metrics/groups/OperatorGroupTest.java | 25 +- .../metrics/groups/TaskManagerGroupTest.java | 15 +- .../metrics/groups/TaskManagerJobGroupTest.java | 17 +- .../metrics/groups/TaskMetricGroupTest.java | 23 +- .../metrics/util/DummyCharacterFilter.java | 27 ++ .../runtime/metrics/util/TestingHistogram.java | 73 +++++ ...askManagerComponentsStartupShutdownTest.java | 4 +- .../testingUtils/TestingTaskManager.scala | 13 +- .../flink/yarn/TestingYarnTaskManager.scala | 9 +- .../org/apache/flink/yarn/YarnTaskManager.scala | 7 +- 50 files changed, 3186 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala index 19b0c62..3972a57 100644 --- a/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala +++ b/flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager import org.apache.flink.runtime.io.network.NetworkEnvironment import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.metrics.MetricRegistry import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration, TaskManagerLocation} /** An extension of the TaskManager that listens for additional Mesos-related @@ -36,7 +37,8 @@ class MesosTaskManager( ioManager: IOManager, network: NetworkEnvironment, numberOfSlots: Int, - leaderRetrievalService: LeaderRetrievalService) + leaderRetrievalService: LeaderRetrievalService, + metricRegistry : MetricRegistry) extends TaskManager( config, resourceID, @@ -45,7 +47,8 @@ class MesosTaskManager( ioManager, network, numberOfSlots, - leaderRetrievalService) { + leaderRetrievalService, + metricRegistry) { override def handleMessage: Receive = { super.handleMessage http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java ---------------------------------------------------------------------- diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 913999b..089efe3 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -21,13 +21,12 @@ package org.apache.flink.metrics.jmx; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.util.TestReporter; +import org.apache.flink.runtime.metrics.util.TestingHistogram; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -286,57 +285,4 @@ public class JMXReporterTest extends TestLogger { } } } - - static class TestingHistogram implements Histogram { - - @Override - public void update(long value) { - - } - - @Override - public long getCount() { - return 1; - } - - @Override - public HistogramStatistics getStatistics() { - return new HistogramStatistics() { - @Override - public double getQuantile(double quantile) { - return quantile; - } - - @Override - public long[] getValues() { - return new long[0]; - } - - @Override - public int size() { - return 3; - } - - @Override - public double getMean() { - return 4; - } - - @Override - public double getStdDev() { - return 5; - } - - @Override - public long getMax() { - return 6; - } - - @Override - public long getMin() { - return 7; - } - }; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 2bfbb85..4dd36e7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -67,6 +67,11 @@ import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandl import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler; import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler; import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler; +import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler; +import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler; +import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.ExecutionContext$; @@ -133,6 +138,8 @@ public class WebRuntimeMonitor implements WebMonitor { private ExecutorService executorService; + private MetricFetcher metricFetcher; + public WebRuntimeMonitor( Configuration config, LeaderRetrievalService leaderRetrievalService, @@ -206,6 +213,8 @@ public class WebRuntimeMonitor implements WebMonitor { ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService); + metricFetcher = new MetricFetcher(actorSystem, retriever, context); + router = new Router() // config how to interact with this web server .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) @@ -235,6 +244,7 @@ public class WebRuntimeMonitor implements WebMonitor { currentGraphs, backPressureStatsTracker, refreshInterval))) + .GET("/jobs/:jobid/vertices/:vertexid/metrics", handler(new JobVertexMetricsHandler(metricFetcher))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs))) .GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs))) @@ -245,6 +255,7 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(currentGraphs))) .GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(currentGraphs))) .GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs))) + .GET("/jobs/:jobid/metrics", handler(new JobMetricsHandler(metricFetcher))) .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) @@ -252,6 +263,7 @@ public class WebRuntimeMonitor implements WebMonitor { new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, config)) .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.STDOUT, config)) + .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagerMetricsHandler(metricFetcher))) // log and stdout .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : @@ -260,6 +272,8 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile)) + .GET("/jobmanager/metrics", handler(new JobManagerMetricsHandler(metricFetcher))) + // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java new file mode 100644 index 0000000..54e4b6f --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; +import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.io.StringWriter; +import java.util.Map; + +/** + * Abstract request handler that returns a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code [ { "id" : "X" } ] } + * + * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public abstract class AbstractMetricsHandler implements RequestHandler { + private final MetricFetcher fetcher; + + public AbstractMetricsHandler(MetricFetcher fetcher) { + this.fetcher = Preconditions.checkNotNull(fetcher); + } + + @Override + public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + fetcher.update(); + String requestedMetricsList = queryParams.get("get"); + return requestedMetricsList != null + ? getMetricsValues(pathParams, requestedMetricsList) + : getAvailableMetricsList(pathParams); + } + + /** + * Returns a Map containing the metrics belonging to the entity pointed to by the path parameters. + * + * @param pathParams REST path parameters + * @param metrics MetricStore containing all metrics + * @return Map containing metrics, or null if no metric exists + */ + protected abstract Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics); + + private String getMetricsValues(Map<String, String> pathParams, String requestedMetricsList) throws IOException { + if (requestedMetricsList.isEmpty()) { + /** + * The WebInterface doesn't check whether the list of available metrics was empty. This can lead to a + * request for which the "get" parameter is an empty string. + */ + return ""; + } + MetricStore metricStore = fetcher.getMetricStore(); + synchronized (metricStore) { + Map<String, Object> metrics = getMapFor(pathParams, metricStore); + if (metrics == null) { + return ""; + } + String[] requestedMetrics = requestedMetricsList.split(","); + + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + + gen.writeStartArray(); + for (String requestedMetric : requestedMetrics) { + Object metricValue = metrics.get(requestedMetric); + if (metricValue != null) { + gen.writeStartObject(); + gen.writeStringField("id", requestedMetric); + gen.writeStringField("value", metricValue.toString()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } + } + + private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException { + MetricStore metricStore = fetcher.getMetricStore(); + synchronized (metricStore) { + Map<String, Object> metrics = getMapFor(pathParams, metricStore); + if (metrics == null) { + return ""; + } + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + + gen.writeStartArray(); + for (String m : metrics.keySet()) { + gen.writeStartObject(); + gen.writeStringField("id", m); + gen.writeEndObject(); + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java new file mode 100644 index 0000000..7435643 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import java.util.Map; + +/** + * Request handler that returns for the job manager a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class JobManagerMetricsHandler extends AbstractMetricsHandler { + public JobManagerMetricsHandler(MetricFetcher fetcher) { + super(fetcher); + } + + @Override + protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.JobManagerMetricStore jobManager = metrics.jobManager; + if (jobManager == null) { + return null; + } else { + return jobManager.metrics; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java new file mode 100644 index 0000000..b54799d --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import java.util.Map; + +/** + * Request handler that returns for a given job a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class JobMetricsHandler extends AbstractMetricsHandler { + public static final String PARAMETER_JOB_ID = "jobid"; + + public JobMetricsHandler(MetricFetcher fetcher) { + super(fetcher); + } + + @Override + protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(PARAMETER_JOB_ID)); + if (job == null) { + return null; + } else { + return job.metrics; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java new file mode 100644 index 0000000..73b8bb0 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import java.util.Map; + +/** + * Request handler that returns for a given task a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class JobVertexMetricsHandler extends AbstractMetricsHandler { + public static final String PARAMETER_VERTEX_ID = "vertexid"; + + public JobVertexMetricsHandler(MetricFetcher fetcher) { + super(fetcher); + } + + @Override + protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.JobMetricStore job = metrics.jobs.get(pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID)); + if (job == null) { + return null; + } else { + MetricStore.TaskMetricStore task = job.tasks.get(pathParams.get(PARAMETER_VERTEX_ID)); + if (task == null) { + return null; + } else { + return task.metrics; + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java new file mode 100644 index 0000000..7a39a53 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; +import akka.pattern.Patterns; +import akka.util.Timeout; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; + +/** + * The MetricFetcher can be used to fetch metrics from the JobManager and all registered TaskManagers. + * + * Metrics will only be fetched when {@link MetricFetcher#update()} is called, provided that a sufficient time since + * the last call has passed. + */ +public class MetricFetcher { + private static final Logger LOG = LoggerFactory.getLogger(MetricFetcher.class); + + private final ActorSystem actorSystem; + private final JobManagerRetriever retriever; + private final ExecutionContext ctx; + private final FiniteDuration timeout = new FiniteDuration(Duration.create(ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT).toMillis(), TimeUnit.MILLISECONDS); + + private MetricStore metrics = new MetricStore(); + private MetricDumpDeserializer deserializer = new MetricDumpDeserializer(); + + private long lastUpdateTime; + + public MetricFetcher(ActorSystem actorSystem, JobManagerRetriever retriever, ExecutionContext ctx) { + this.actorSystem = Preconditions.checkNotNull(actorSystem); + this.retriever = Preconditions.checkNotNull(retriever); + this.ctx = Preconditions.checkNotNull(ctx); + } + + /** + * Returns the MetricStore containing all stored metrics. + * + * @return MetricStore containing all stored metrics; + */ + public MetricStore getMetricStore() { + return metrics; + } + + /** + * This method can be used to signal this MetricFetcher that the metrics are still in use and should be updated. + */ + public void update() { + synchronized (this) { + long currentTime = System.currentTimeMillis(); + if (currentTime - lastUpdateTime > 10000) { // 10 seconds have passed since the last update + lastUpdateTime = currentTime; + fetchMetrics(); + } + } + } + + private void fetchMetrics() { + try { + Option<scala.Tuple2<ActorGateway, Integer>> jobManagerGatewayAndWebPort = retriever.getJobManagerGatewayAndWebPort(); + if (jobManagerGatewayAndWebPort.isDefined()) { + ActorGateway jobManager = jobManagerGatewayAndWebPort.get()._1(); + + /** + * Remove all metrics that belong to a job that is not running and no longer archived. + */ + Future<Object> jobDetailsFuture = jobManager.ask(new RequestJobDetails(true, true), timeout); + jobDetailsFuture + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + MultipleJobsDetails details = (MultipleJobsDetails) result; + ArrayList<String> toRetain = new ArrayList<>(); + for (JobDetails job : details.getRunningJobs()) { + toRetain.add(job.getJobId().toString()); + } + for (JobDetails job : details.getFinishedJobs()) { + toRetain.add(job.getJobId().toString()); + } + synchronized (metrics) { + metrics.jobs.keySet().retainAll(toRetain); + } + } + }, ctx); + logErrorOnFailure(jobDetailsFuture, "Fetching of JobDetails failed."); + + String jobManagerPath = jobManager.path(); + String queryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + "MetricQueryService"; + ActorRef jobManagerQueryService = actorSystem.actorFor(queryServicePath); + + queryMetrics(jobManagerQueryService); + + /** + * We first request the list of all registered task managers from the job manager, and then + * request the respective metric dump from each task manager. + * + * All stored metrics that do not belong to a registered task manager will be removed. + */ + Future<Object> registeredTaskManagersFuture = jobManager.ask(JobManagerMessages.getRequestRegisteredTaskManagers(), timeout); + registeredTaskManagersFuture + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + Iterable<Instance> taskManagers = ((JobManagerMessages.RegisteredTaskManagers) result).asJavaIterable(); + List<String> activeTaskManagers = new ArrayList<>(); + for (Instance taskManager : taskManagers) { + activeTaskManagers.add(taskManager.getId().toString()); + + String taskManagerPath = taskManager.getActorGateway().path(); + String queryServicePath = taskManagerPath.substring(0, taskManagerPath.lastIndexOf('/') + 1) + "MetricQueryService"; + ActorRef taskManagerQueryService = actorSystem.actorFor(queryServicePath); + + queryMetrics(taskManagerQueryService); + } + synchronized (metrics) { // remove all metrics belonging to unregistered task managers + metrics.taskManagers.keySet().retainAll(activeTaskManagers); + } + } + }, ctx); + logErrorOnFailure(registeredTaskManagersFuture, "Fetchin list of registered TaskManagers failed."); + } + } catch (Exception e) { + LOG.warn("Exception while fetching metrics.", e); + } + } + + private void logErrorOnFailure(Future<Object> future, final String message) { + future.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + LOG.warn(message, failure); + } + }, ctx); + } + + /** + * Requests a metric dump from the given actor. + * + * @param actor ActorRef to request the dump from + */ + private void queryMetrics(ActorRef actor) { + Future<Object> metricQueryFuture = new BasicGateway(actor).ask(MetricQueryService.getCreateDump(), timeout); + metricQueryFuture + .onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object result) throws Throwable { + addMetrics(result); + } + }, ctx); + logErrorOnFailure(metricQueryFuture, "Fetching metrics failed."); + } + + private void addMetrics(Object result) throws IOException { + byte[] data = (byte[]) result; + List<MetricDump> dumpedMetrics = deserializer.deserialize(data); + for (MetricDump metric : dumpedMetrics) { + metrics.add(metric); + } + } + + /** + * Helper class that allows mocking of the answer. + */ + static class BasicGateway { + private final ActorRef actor; + + private BasicGateway(ActorRef actor) { + this.actor = actor; + } + + /** + * Sends a message asynchronously and returns its response. The response to the message is + * returned as a future. + * + * @param message Message to be sent + * @param timeout Timeout until the Future is completed with an AskTimeoutException + * @return Future which contains the response to the sent message + */ + public Future<Object> ask(Object message, FiniteDuration timeout) { + return Patterns.ask(actor, message, new Timeout(timeout)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java new file mode 100644 index 0000000..41e68cc --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; +import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; + +/** + * Nested data-structure to store metrics. + * + * This structure is not thread-safe. + */ +public class MetricStore { + private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); + + final JobManagerMetricStore jobManager = new JobManagerMetricStore(); + final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); + final Map<String, JobMetricStore> jobs = new HashMap<>(); + + public void add(MetricDump metric) { + try { + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + addMetric(jobManager.metrics, name, metric); + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + tm = taskManagers.get(tmID); + if (tm == null) { + tm = new TaskManagerMetricStore(); + taskManagers.put(tmID, tm); + } + addMetric(tm.metrics, name, metric); + break; + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + job = jobs.get(jobInfo.jobID); + if (job == null) { + job = new JobMetricStore(); + jobs.put(jobInfo.jobID, job); + } + addMetric(job.metrics, name, metric); + break; + case INFO_CATEGORY_TASK: + QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; + job = jobs.get(taskInfo.jobID); + if (job == null) { + job = new JobMetricStore(); + jobs.put(taskInfo.jobID, job); + } + task = job.tasks.get(taskInfo.vertexID); + if (task == null) { + task = new TaskMetricStore(); + job.tasks.put(taskInfo.vertexID, task); + } + /** + * As the WebInterface task metric queries currently do not account for subtasks we don't + * divide by subtask and instead use the concatenation of subtask index and metric name as the name. + */ + addMetric(task.metrics, taskInfo.subtaskIndex + "." + name, metric); + break; + case INFO_CATEGORY_OPERATOR: + QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; + job = jobs.get(operatorInfo.jobID); + if (job == null) { + job = new JobMetricStore(); + jobs.put(operatorInfo.jobID, job); + } + task = job.tasks.get(operatorInfo.vertexID); + if (task == null) { + task = new TaskMetricStore(); + job.tasks.put(operatorInfo.vertexID, task); + } + /** + * As the WebInterface does not account for operators (because it can't) we don't + * divide by operator and instead use the concatenation of subtask index, operator name and metric name + * as the name. + */ + addMetric(task.metrics, operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, metric); + break; + default: + LOG.debug("Invalid metric dump category: " + info.getCategory()); + } + } catch (Exception e) { + LOG.debug("Malformed metric dump.", e); + } + } + + private void addMetric(Map<String, Object> target, String name, MetricDump metric) { + switch (metric.getCategory()) { + case METRIC_CATEGORY_COUNTER: + MetricDump.CounterDump counter = (MetricDump.CounterDump) metric; + target.put(name, counter.count); + break; + case METRIC_CATEGORY_GAUGE: + MetricDump.GaugeDump gauge = (MetricDump.GaugeDump) metric; + target.put(name, gauge.value); + break; + case METRIC_CATEGORY_HISTOGRAM: + MetricDump.HistogramDump histogram = (MetricDump.HistogramDump) metric; + target.put(name + "_min", histogram.min); + target.put(name + "_max", histogram.max); + target.put(name + "_mean", histogram.mean); + target.put(name + "_median", histogram.median); + target.put(name + "_stddev", histogram.stddev); + target.put(name + "_p75", histogram.p75); + target.put(name + "_p90", histogram.p90); + target.put(name + "_p95", histogram.p95); + target.put(name + "_p98", histogram.p98); + target.put(name + "_p99", histogram.p99); + target.put(name + "_p999", histogram.p999); + break; + case METRIC_CATEGORY_METER: + MetricDump.MeterDump meter = (MetricDump.MeterDump) metric; + target.put(name, meter.rate); + break; + } + } + + /** + * Sub-structure containing metrics of the JobManager. + */ + static class JobManagerMetricStore { + public final Map<String, Object> metrics = new HashMap<>(); + } + + /** + * Sub-structure containing metrics of a single TaskManager. + */ + static class TaskManagerMetricStore { + public final Map<String, Object> metrics = new HashMap<>(); + } + + /** + * Sub-structure containing metrics of a single Job. + */ + static class JobMetricStore { + public final Map<String, Object> metrics = new HashMap<>(); + public final Map<String, TaskMetricStore> tasks = new HashMap<>(); + } + + /** + * Sub-structure containing metrics of a single Task. + */ + static class TaskMetricStore { + public final Map<String, Object> metrics = new HashMap<>(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java new file mode 100644 index 0000000..fea3d07 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import java.util.Map; + +/** + * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics. + * + * If the query parameters do not contain a "get" parameter the list of all metrics is returned. + * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } + * + * If the query parameters do contain a "get" parameter a comma-separate list of metric names is expected as a value. + * {@code /get?X,Y} + * The handler will then return a list containing the values of the requested metrics. + * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } + */ +public class TaskManagerMetricsHandler extends AbstractMetricsHandler { + public static final String PARAMETER_TM_ID = "tmid"; + + public TaskManagerMetricsHandler(MetricFetcher fetcher) { + super(fetcher); + } + + @Override + protected Map<String, Object> getMapFor(Map<String, String> pathParams, MetricStore metrics) { + MetricStore.TaskManagerMetricStore taskManager = metrics.taskManagers.get(pathParams.get(PARAMETER_TM_ID)); + if (taskManager == null) { + return null; + } else { + return taskManager.metrics; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java new file mode 100644 index 0000000..483dbf6 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorSystem; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.ExecutionContext; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.powermock.api.mockito.PowerMockito.mock; + +public class AbstractMetricsHandlerTest extends TestLogger { + /** + * Verifies that the handlers correctly handle expected REST calls + */ + @Test + public void testHandleRequest() throws Exception { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStoreTest.setupStore(fetcher.getMetricStore()); + + JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + Map<String, String> queryParams = new HashMap<>(); + + pathParams.put("jobid", "jobid"); + pathParams.put("vertexid", "taskid"); + + // get list of available metrics + String availableList = handler.handleRequest(pathParams, queryParams, null); + + assertEquals("[" + + "{\"id\":\"8.opname.abc.metric5\"}," + + "{\"id\":\"8.abc.metric4\"}" + + "]", + availableList); + + // get value for a single metric + queryParams.put("get", "8.opname.abc.metric5"); + + String metricValue = handler.handleRequest(pathParams, queryParams, null); + + assertEquals("[" + + "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" + + "]" + , metricValue + ); + + // get values for multiple metrics + queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4"); + + String metricValues = handler.handleRequest(pathParams, queryParams, null); + + assertEquals("[" + + "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," + + "{\"id\":\"8.abc.metric4\",\"value\":\"3\"}" + + "]", + metricValues + ); + } + + /** + * Verifies that a malformed request for available metrics does not throw an exception. + */ + @Test + public void testInvalidListDoesNotFail() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStoreTest.setupStore(fetcher.getMetricStore()); + + JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + Map<String, String> queryParams = new HashMap<>(); + + pathParams.put("jobid", "jobid"); + pathParams.put("vertexid", "taskid"); + + //-----invalid variable + pathParams.put("jobid", "nonexistent"); + + try { + assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + } catch (Exception e) { + fail(); + } + } + + /** + * Verifies that a malformed request for a metric value does not throw an exception. + */ + @Test + public void testInvalidGetDoesNotFail() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStoreTest.setupStore(fetcher.getMetricStore()); + + JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + Map<String, String> queryParams = new HashMap<>(); + + pathParams.put("jobid", "jobid"); + pathParams.put("vertexid", "taskid"); + + //-----empty string + queryParams.put("get", ""); + + try { + assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + } catch (Exception e) { + fail(e.getMessage()); + } + + //-----invalid variable + pathParams.put("jobid", "nonexistent"); + queryParams.put("get", "subindex.opname.abc.metric5"); + + try { + assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + } catch (Exception e) { + fail(e.getMessage()); + } + + //-----invalid metric + pathParams.put("jobid", "nonexistant"); + queryParams.put("get", "subindex.opname.abc.nonexistant"); + + try { + assertEquals("", handler.handleRequest(pathParams, queryParams, null)); + } catch (Exception e) { + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java new file mode 100644 index 0000000..9757574 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorSystem; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.ExecutionContext; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +public class JobManagerMetricsHandlerTest extends TestLogger { + @Test + public void getMapFor() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertEquals(0L, metrics.get("abc.metric1")); + } + + @Test + public void getMapForNull() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = fetcher.getMetricStore(); + + JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertNotNull(metrics); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java new file mode 100644 index 0000000..c0cc345 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorSystem; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.ExecutionContext; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +public class JobMetricsHandlerTest extends TestLogger { + @Test + public void getMapFor() throws Exception { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + JobMetricsHandler handler = new JobMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(PARAMETER_JOB_ID, "jobid"); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertEquals(2L, metrics.get("abc.metric3")); + } + + @Test + public void getMapForNull() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = fetcher.getMetricStore(); + + JobMetricsHandler handler = new JobMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertNull(metrics); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java new file mode 100644 index 0000000..d6e5ca7 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorSystem; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.ExecutionContext; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler.PARAMETER_JOB_ID; +import static org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler.PARAMETER_VERTEX_ID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +public class JobVertexMetricsHandlerTest extends TestLogger { + @Test + public void getMapFor() throws Exception { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(PARAMETER_JOB_ID, "jobid"); + pathParams.put(PARAMETER_VERTEX_ID, "taskid"); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertEquals(3L, metrics.get("8.abc.metric4")); + + assertEquals(4L, metrics.get("8.opname.abc.metric5")); + } + + @Test + public void getMapForNull() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = fetcher.getMetricStore(); + + JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertNull(metrics); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java new file mode 100644 index 0000000..e0cfe26 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcherTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.metrics.util.TestingHistogram; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import scala.Option; +import scala.collection.JavaConverters; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future$; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.metrics.dump.MetricQueryService.METRIC_QUERY_SERVICE_NAME; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(MetricFetcher.class) +public class MetricFetcherTest extends TestLogger { + @Test + public void testUpdate() throws Exception { + // ========= setup TaskManager ================================================================================= + JobID jobID = new JobID(); + InstanceID tmID = new InstanceID(); + ActorGateway taskManagerGateway = mock(ActorGateway.class); + when(taskManagerGateway.path()).thenReturn("/tm/address"); + + Instance taskManager = mock(Instance.class); + when(taskManager.getActorGateway()).thenReturn(taskManagerGateway); + when(taskManager.getId()).thenReturn(tmID); + + // ========= setup JobManager ================================================================================== + JobDetails details = mock(JobDetails.class); + when(details.getJobId()).thenReturn(jobID); + + ActorGateway jobManagerGateway = mock(ActorGateway.class); + Object registeredTaskManagersAnswer = new JobManagerMessages.RegisteredTaskManagers( + JavaConverters.collectionAsScalaIterableConverter(Collections.singletonList(taskManager)).asScala()); + + when(jobManagerGateway.ask(isA(RequestJobDetails.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful((Object) new MultipleJobsDetails(new JobDetails[0], new JobDetails[0]))); + when(jobManagerGateway.ask(isA(JobManagerMessages.RequestRegisteredTaskManagers$.class), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful(registeredTaskManagersAnswer)); + when(jobManagerGateway.path()).thenReturn("/jm/address"); + + JobManagerRetriever retriever = mock(JobManagerRetriever.class); + when(retriever.getJobManagerGatewayAndWebPort()) + .thenReturn(Option.apply(new scala.Tuple2<ActorGateway, Integer>(jobManagerGateway, 0))); + + // ========= setup QueryServices ================================================================================ + Object requestMetricsAnswer = createRequestDumpAnswer(tmID, jobID); + + final ActorRef jmQueryService = mock(ActorRef.class); + final ActorRef tmQueryService = mock(ActorRef.class); + + ActorSystem actorSystem = mock(ActorSystem.class); + when(actorSystem.actorFor(eq("/jm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(jmQueryService); + when(actorSystem.actorFor(eq("/tm/" + METRIC_QUERY_SERVICE_NAME))).thenReturn(tmQueryService); + + MetricFetcher.BasicGateway jmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); + when(jmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful((Object) new byte[16])); + + MetricFetcher.BasicGateway tmQueryServiceGateway = mock(MetricFetcher.BasicGateway.class); + when(tmQueryServiceGateway.ask(any(MetricQueryService.getCreateDump().getClass()), any(FiniteDuration.class))) + .thenReturn(Future$.MODULE$.successful(requestMetricsAnswer)); + + whenNew(MetricFetcher.BasicGateway.class) + .withArguments(eq(new Object() { + @Override + public boolean equals(Object o) { + return o == jmQueryService; + } + })) + .thenReturn(jmQueryServiceGateway); + whenNew(MetricFetcher.BasicGateway.class) + .withArguments(eq(new Object() { + @Override + public boolean equals(Object o) { + return o == tmQueryService; + } + })) + .thenReturn(tmQueryServiceGateway); + + // ========= start MetricFetcher testing ======================================================================= + ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(new CurrentThreadExecutor()); + MetricFetcher fetcher = new MetricFetcher(actorSystem, retriever, context); + + // verify that update fetches metrics and updates the store + fetcher.update(); + MetricStore store = fetcher.getMetricStore(); + synchronized (store) { + assertEquals(7L, store.jobManager.metrics.get("abc.hist_min")); + assertEquals(6L, store.jobManager.metrics.get("abc.hist_max")); + assertEquals(4.0, store.jobManager.metrics.get("abc.hist_mean")); + assertEquals(0.5, store.jobManager.metrics.get("abc.hist_median")); + assertEquals(5.0, store.jobManager.metrics.get("abc.hist_stddev")); + assertEquals(0.75, store.jobManager.metrics.get("abc.hist_p75")); + assertEquals(0.9, store.jobManager.metrics.get("abc.hist_p90")); + assertEquals(0.95, store.jobManager.metrics.get("abc.hist_p95")); + assertEquals(0.98, store.jobManager.metrics.get("abc.hist_p98")); + assertEquals(0.99, store.jobManager.metrics.get("abc.hist_p99")); + assertEquals(0.999, store.jobManager.metrics.get("abc.hist_p999")); + + assertEquals("x", store.taskManagers.get(tmID.toString()).metrics.get("abc.gauge")); + assertEquals(5.0, store.jobs.get(jobID.toString()).metrics.get("abc.jc")); + assertEquals(2L, store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.abc.tc")); + assertEquals(1L, store.jobs.get(jobID.toString()).tasks.get("taskid").metrics.get("2.opname.abc.oc")); + } + } + + public class CurrentThreadExecutor implements Executor { + public void execute(Runnable r) { + r.run(); + } + } + + private static byte[] createRequestDumpAnswer(InstanceID tmID, JobID jobID) throws IOException { + Map<Counter, Tuple2<QueryScopeInfo, String>> counters = new HashMap<>(); + Map<Gauge<?>, Tuple2<QueryScopeInfo, String>> gauges = new HashMap<>(); + Map<Histogram, Tuple2<QueryScopeInfo, String>> histograms = new HashMap<>(); + Map<Meter, Tuple2<QueryScopeInfo, String>> meters = new HashMap<>(); + + SimpleCounter c1 = new SimpleCounter(); + SimpleCounter c2 = new SimpleCounter(); + + c1.inc(1); + c2.inc(2); + + counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc")); + counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc")); + meters.put(new Meter() { + @Override + public void markEvent() { + } + + @Override + public void markEvent(long n) { + } + + @Override + public double getRate() { + return 5; + } + + @Override + public long getCount() { + return 10; + } + }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc")); + gauges.put(new Gauge<String>() { + @Override + public String getValue() { + return "x"; + } + }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); + histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); + + MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); + byte[] dump = serializer.serialize(counters, gauges, histograms, meters); + serializer.close(); + + return dump; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java new file mode 100644 index 0000000..9dc2929 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/MetricStoreTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class MetricStoreTest extends TestLogger { + @Test + public void testAdd() throws IOException { + MetricStore store = setupStore(new MetricStore()); + + assertEquals(0L, store.jobManager.metrics.get("abc.metric1")); + assertEquals(1L, store.taskManagers.get("tmid").metrics.get("abc.metric2")); + assertEquals(2L, store.jobs.get("jobid").metrics.get("abc.metric3")); + assertEquals(3L, store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.abc.metric4")); + assertEquals(4L, store.jobs.get("jobid").tasks.get("taskid").metrics.get("8.opname.abc.metric5")); + } + + @Test + public void testMalformedNameHandling() { + MetricStore store = new MetricStore(); + //-----verify that no exceptions are thrown + + // null + store.add(null); + // empty name + QueryScopeInfo.JobManagerQueryScopeInfo info = new QueryScopeInfo.JobManagerQueryScopeInfo(""); + MetricDump.CounterDump cd = new MetricDump.CounterDump(info, "", 0); + store.add(cd); + + //-----verify that no side effects occur + assertEquals(0, store.jobManager.metrics.size()); + assertEquals(0, store.taskManagers.size()); + assertEquals(0, store.jobs.size()); + } + + public static MetricStore setupStore(MetricStore store) { + QueryScopeInfo.JobManagerQueryScopeInfo jm = new QueryScopeInfo.JobManagerQueryScopeInfo("abc"); + MetricDump.CounterDump cd1 = new MetricDump.CounterDump(jm, "metric1", 0); + + QueryScopeInfo.TaskManagerQueryScopeInfo tm = new QueryScopeInfo.TaskManagerQueryScopeInfo("tmid", "abc"); + MetricDump.CounterDump cd2 = new MetricDump.CounterDump(tm, "metric2", 1); + + QueryScopeInfo.JobQueryScopeInfo job = new QueryScopeInfo.JobQueryScopeInfo("jobid", "abc"); + MetricDump.CounterDump cd3 = new MetricDump.CounterDump(job, "metric3", 2); + + QueryScopeInfo.TaskQueryScopeInfo task = new QueryScopeInfo.TaskQueryScopeInfo("jobid", "taskid", 8, "abc"); + MetricDump.CounterDump cd4 = new MetricDump.CounterDump(task, "metric4", 3); + + QueryScopeInfo.OperatorQueryScopeInfo operator = new QueryScopeInfo.OperatorQueryScopeInfo("jobid", "taskid", 8, "opname", "abc"); + MetricDump.CounterDump cd5 = new MetricDump.CounterDump(operator, "metric5", 4); + + store.add(cd1); + store.add(cd2); + store.add(cd3); + store.add(cd4); + store.add(cd5); + + return store; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java new file mode 100644 index 0000000..6299a56 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.metrics; + +import akka.actor.ActorSystem; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import scala.concurrent.ExecutionContext; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler.PARAMETER_TM_ID; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +public class TaskManagerMetricsHandlerTest extends TestLogger { + @Test + public void getMapFor() throws Exception { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore()); + + TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + pathParams.put(PARAMETER_TM_ID, "tmid"); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertEquals(1L, metrics.get("abc.metric2")); + } + + @Test + public void getMapForNull() { + MetricFetcher fetcher = new MetricFetcher(mock(ActorSystem.class), mock(JobManagerRetriever.class), mock(ExecutionContext.class)); + MetricStore store = fetcher.getMetricStore(); + + TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher); + + Map<String, String> pathParams = new HashMap<>(); + + Map<String, Object> metrics = handler.getMapFor(pathParams, store); + + assertNull(metrics); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index 763ea66..9c96858 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.metrics; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; @@ -26,6 +28,8 @@ import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.metrics.dump.MetricQueryService; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormat; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.slf4j.Logger; @@ -47,6 +51,7 @@ public class MetricRegistry { private List<MetricReporter> reporters; private ScheduledExecutorService executor; + private ActorRef queryService; private final ScopeFormats scopeFormats; @@ -144,6 +149,19 @@ public class MetricRegistry { } } + /** + * Initializes the MetricQueryService. + * + * @param actorSystem ActorSystem to create the MetricQueryService on + */ + public void startQueryService(ActorSystem actorSystem) { + try { + queryService = MetricQueryService.startMetricQueryService(actorSystem); + } catch (Exception e) { + LOG.warn("Could not start MetricDumpActor. No metrics will be submitted to the WebInterface.", e); + } + } + public char getDelimiter() { return this.delimiter; } @@ -207,6 +225,9 @@ public class MetricRegistry { } } } + if (queryService != null) { + MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, (AbstractMetricGroup) group); + } } catch (Exception e) { LOG.error("Error while registering metric.", e); } @@ -228,6 +249,9 @@ public class MetricRegistry { } } } + if (queryService != null) { + MetricQueryService.notifyOfRemovedMetric(queryService, metric); + } } catch (Exception e) { LOG.error("Error while registering metric.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/70704de0/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java new file mode 100644 index 0000000..2239b50 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.apache.flink.util.Preconditions; + +/** + * A container for a dumped metric that contains the scope, name and value(s) of the metric. + */ +public abstract class MetricDump { + /** Categories to be returned by {@link MetricDump#getCategory()} to avoid instanceof checks. */ + public static final byte METRIC_CATEGORY_COUNTER = 0; + public static final byte METRIC_CATEGORY_GAUGE = 1; + public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + public static final byte METRIC_CATEGORY_METER = 3; + + /** The scope information for the stored metric. */ + public final QueryScopeInfo scopeInfo; + /** The name of the stored metric. */ + public final String name; + + private MetricDump(QueryScopeInfo scopeInfo, String name) { + this.scopeInfo = Preconditions.checkNotNull(scopeInfo); + this.name = Preconditions.checkNotNull(name); + } + + /** + * Returns the category for this MetricDump. + * + * @return category + */ + public abstract byte getCategory(); + + /** + * Container for the value of a {@link org.apache.flink.metrics.Counter}. + */ + public static class CounterDump extends MetricDump { + public final long count; + + public CounterDump(QueryScopeInfo scopeInfo, String name, long count) { + super(scopeInfo, name); + this.count = count; + } + + @Override + public byte getCategory() { + return METRIC_CATEGORY_COUNTER; + } + } + + /** + * Container for the value of a {@link org.apache.flink.metrics.Gauge} as a string. + */ + public static class GaugeDump extends MetricDump { + public final String value; + + public GaugeDump(QueryScopeInfo scopeInfo, String name, String value) { + super(scopeInfo, name); + this.value = Preconditions.checkNotNull(value); + } + + @Override + public byte getCategory() { + return METRIC_CATEGORY_GAUGE; + } + } + + /** + * Container for the values of a {@link org.apache.flink.metrics.Histogram}. + */ + public static class HistogramDump extends MetricDump { + public long min; + public long max; + public double mean; + public double median; + public double stddev; + public double p75; + public double p90; + public double p95; + public double p98; + public double p99; + public double p999; + + public HistogramDump(QueryScopeInfo scopeInfo, String name, + long min, long max, double mean, double median, double stddev, + double p75, double p90, double p95, double p98, double p99, double p999) { + + super(scopeInfo, name); + this.min = min; + this.max = max; + this.mean = mean; + this.median = median; + this.stddev = stddev; + this.p75 = p75; + this.p90 = p90; + this.p95 = p95; + this.p98 = p98; + this.p99 = p99; + this.p999 = p999; + } + + @Override + public byte getCategory() { + return METRIC_CATEGORY_HISTOGRAM; + } + } + + /** + * Container for the rate of a {@link org.apache.flink.metrics.Meter}. + */ + public static class MeterDump extends MetricDump { + public final double rate; + + public MeterDump(QueryScopeInfo scopeInfo, String name, double rate) { + super(scopeInfo, name); + this.rate = rate; + } + @Override + public byte getCategory() { + return METRIC_CATEGORY_METER; + } + } +}