Repository: flink Updated Branches: refs/heads/master 68f446c9f -> f622de3ec
[FLINK-7368][metrics] Make MetricStore ThreadSafe class Remove external synchronization on MetricStore This closes #4472. This closes #4840. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f622de3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f622de3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f622de3e Branch: refs/heads/master Commit: f622de3ecbc2ae17f2d15fe46828c48747c2b6ae Parents: 68f446c Author: Piotr Nowojski <[email protected]> Authored: Mon Oct 16 16:53:14 2017 +0200 Committer: zentol <[email protected]> Committed: Thu Oct 26 16:41:57 2017 +0200 ---------------------------------------------------------------------- .../handler/legacy/TaskManagersHandler.java | 123 ++++---- .../legacy/metrics/AbstractMetricsHandler.java | 76 +++-- .../metrics/JobManagerMetricsHandler.java | 2 +- .../legacy/metrics/JobMetricsHandler.java | 2 +- .../legacy/metrics/JobVertexMetricsHandler.java | 2 +- .../handler/legacy/metrics/MetricFetcher.java | 22 +- .../handler/legacy/metrics/MetricStore.java | 297 +++++++++++-------- .../metrics/TaskManagerMetricsHandler.java | 2 +- .../rest/handler/util/MutableIOMetrics.java | 88 +++--- .../legacy/metrics/MetricFetcherTest.java | 32 +- .../handler/legacy/metrics/MetricStoreTest.java | 6 +- 11 files changed, 340 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java index e608b99..93c5b44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java @@ -130,71 +130,68 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { // only send metrics when only one task manager requests them. if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { fetcher.update(); - final MetricStore metricStore = fetcher.getMetricStore(); - - synchronized (metricStore) { - MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString()); - if (metrics != null) { - gen.writeObjectFieldStart("metrics"); - long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); - long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); - long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); - - gen.writeNumberField("heapCommitted", heapCommitted); - gen.writeNumberField("heapUsed", heapUsed); - gen.writeNumberField("heapMax", heapTotal); - - long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); - long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); - long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); - - gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); - gen.writeNumberField("nonHeapUsed", nonHeapUsed); - gen.writeNumberField("nonHeapMax", nonHeapTotal); - - gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); - gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); - gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); - - long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); - long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); - long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); - - gen.writeNumberField("directCount", directCount); - gen.writeNumberField("directUsed", directUsed); - gen.writeNumberField("directMax", directMax); - - long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); - long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); - long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); - - gen.writeNumberField("mappedCount", mappedCount); - gen.writeNumberField("mappedUsed", mappedUsed); - gen.writeNumberField("mappedMax", mappedMax); - - long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); - long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); - - gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); - gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); - - gen.writeArrayFieldStart("garbageCollectors"); - - for (String gcName : metrics.garbageCollectorNames) { - String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); - String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); - if (count != null && time != null) { - gen.writeStartObject(); - gen.writeStringField("name", gcName); - gen.writeNumberField("count", Long.valueOf(count)); - gen.writeNumberField("time", Long.valueOf(time)); - gen.writeEndObject(); - } - } - gen.writeEndArray(); - gen.writeEndObject(); + MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); + if (metrics != null) { + gen.writeObjectFieldStart("metrics"); + long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + + gen.writeNumberField("heapCommitted", heapCommitted); + gen.writeNumberField("heapUsed", heapUsed); + gen.writeNumberField("heapMax", heapTotal); + + long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + + gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); + gen.writeNumberField("nonHeapUsed", nonHeapUsed); + gen.writeNumberField("nonHeapMax", nonHeapTotal); + + gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); + gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); + gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); + + long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); + long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); + long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); + + gen.writeNumberField("directCount", directCount); + gen.writeNumberField("directUsed", directUsed); + gen.writeNumberField("directMax", directMax); + + long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); + long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); + long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); + + gen.writeNumberField("mappedCount", mappedCount); + gen.writeNumberField("mappedUsed", mappedUsed); + gen.writeNumberField("mappedMax", mappedMax); + + long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); + long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); + + gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); + gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); + + gen.writeArrayFieldStart("garbageCollectors"); + + for (String gcName : metrics.garbageCollectorNames) { + String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); + String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); + if (count != null && time != null) { + gen.writeStartObject(); + gen.writeStringField("name", gcName); + gen.writeNumberField("count", Long.valueOf(count)); + gen.writeNumberField("time", Long.valueOf(time)); + gen.writeEndObject(); + } } + + gen.writeEndArray(); + gen.writeEndObject(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java index 6cf83c4..186397b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/AbstractMetricsHandler.java @@ -87,54 +87,48 @@ public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler */ return ""; } - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - Map<String, String> metrics = getMapFor(pathParams, metricStore); - if (metrics == null) { - return ""; - } - String[] requestedMetrics = requestedMetricsList.split(","); - - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - - gen.writeStartArray(); - for (String requestedMetric : requestedMetrics) { - Object metricValue = metrics.get(requestedMetric); - if (metricValue != null) { - gen.writeStartObject(); - gen.writeStringField("id", requestedMetric); - gen.writeStringField("value", metricValue.toString()); - gen.writeEndObject(); - } - } - gen.writeEndArray(); - - gen.close(); - return writer.toString(); + Map<String, String> metrics = getMapFor(pathParams, fetcher.getMetricStore()); + if (metrics == null) { + return ""; } - } + String[] requestedMetrics = requestedMetricsList.split(","); - private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException { - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - Map<String, String> metrics = getMapFor(pathParams, metricStore); - if (metrics == null) { - return ""; - } - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); - gen.writeStartArray(); - for (String m : metrics.keySet()) { + gen.writeStartArray(); + for (String requestedMetric : requestedMetrics) { + Object metricValue = metrics.get(requestedMetric); + if (metricValue != null) { gen.writeStartObject(); - gen.writeStringField("id", m); + gen.writeStringField("id", requestedMetric); + gen.writeStringField("value", metricValue.toString()); gen.writeEndObject(); } - gen.writeEndArray(); + } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); + } - gen.close(); - return writer.toString(); + private String getAvailableMetricsList(Map<String, String> pathParams) throws IOException { + Map<String, String> metrics = getMapFor(pathParams, fetcher.getMetricStore()); + if (metrics == null) { + return ""; + } + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer); + + gen.writeStartArray(); + for (String m : metrics.keySet()) { + gen.writeStartObject(); + gen.writeStringField("id", m); + gen.writeEndObject(); } + gen.writeEndArray(); + + gen.close(); + return writer.toString(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java index c568ee0..35a4efd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobManagerMetricsHandler.java @@ -47,7 +47,7 @@ public class JobManagerMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobManagerMetricStore jobManager = metrics.getJobManagerMetricStore(); + MetricStore.ComponentMetricStore jobManager = metrics.getJobManagerMetricStore(); if (jobManager == null) { return null; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java index 7341eb8..34e7b87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java @@ -47,7 +47,7 @@ public class JobMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.JobMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); + MetricStore.ComponentMetricStore job = metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID)); return job != null ? job.metrics : null; http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java index 3a701ab..5035645 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java @@ -47,7 +47,7 @@ public class JobVertexMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.TaskMetricStore task = metrics.getTaskMetricStore( + MetricStore.ComponentMetricStore task = metrics.getTaskMetricStore( pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID), pathParams.get(PARAMETER_VERTEX_ID)); return task != null http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java index c114ee6..fa71c68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcher.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -112,16 +111,14 @@ public class MetricFetcher { if (throwable != null) { LOG.debug("Fetching of JobDetails failed.", throwable); } else { - ArrayList<String> toRetain = new ArrayList<>(); + ArrayList<String> activeJobs = new ArrayList<>(); for (JobDetails job : jobDetails.getRunning()) { - toRetain.add(job.getJobId().toString()); + activeJobs.add(job.getJobId().toString()); } for (JobDetails job : jobDetails.getFinished()) { - toRetain.add(job.getJobId().toString()); - } - synchronized (metrics) { - metrics.jobs.keySet().retainAll(toRetain); + activeJobs.add(job.getJobId().toString()); } + metrics.retainJobs(activeJobs); } }, executor); @@ -154,9 +151,7 @@ public class MetricFetcher { return taskManagerInstance.getId().toString(); }).collect(Collectors.toList()); - synchronized (metrics) { - metrics.taskManagers.keySet().retainAll(activeTaskManagers); - } + metrics.retainTaskManagers(activeTaskManagers); } }, executor); @@ -198,12 +193,7 @@ public class MetricFetcher { if (t != null) { LOG.debug("Fetching metrics failed.", t); } else { - List<MetricDump> dumpedMetrics = deserializer.deserialize(result); - synchronized (metrics) { - for (MetricDump metric : dumpedMetrics) { - metrics.add(metric); - } - } + metrics.addAll(deserializer.deserialize(result)); } }, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java index 9c13ab8..f9e79d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java @@ -18,17 +18,22 @@ package org.apache.flink.runtime.rest.handler.legacy.metrics; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.HashSet; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE; import static org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM; @@ -38,29 +43,136 @@ import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Nested data-structure to store metrics. - * - * <p>This structure is not thread-safe. */ +@ThreadSafe public class MetricStore { private static final Logger LOG = LoggerFactory.getLogger(MetricStore.class); - final JobManagerMetricStore jobManager = new JobManagerMetricStore(); - final Map<String, TaskManagerMetricStore> taskManagers = new HashMap<>(); - final Map<String, JobMetricStore> jobs = new HashMap<>(); + private final ComponentMetricStore jobManager = new ComponentMetricStore(); + private final Map<String, TaskManagerMetricStore> taskManagers = new ConcurrentHashMap<>(); + private final Map<String, JobMetricStore> jobs = new ConcurrentHashMap<>(); + + /** + * Remove inactive task managers. + * + * @param activeTaskManagers to retain. + */ + synchronized void retainTaskManagers(List<String> activeTaskManagers) { + taskManagers.keySet().retainAll(activeTaskManagers); + } + + /** + * Remove inactive jobs.. + * + * @param activeJobs to retain. + */ + synchronized void retainJobs(List<String> activeJobs) { + jobs.keySet().retainAll(activeJobs); + } + + /** + * Add metric dumps to the store. + * + * @param metricDumps to add. + */ + synchronized void addAll(List<MetricDump> metricDumps) { + for (MetricDump metric : metricDumps) { + add(metric); + } + } // ----------------------------------------------------------------------------------------------------------------- - // Adding metrics + // Accessors for sub MetricStores // ----------------------------------------------------------------------------------------------------------------- - public void add(MetricDump metric) { + + /** + * Returns the {@link ComponentMetricStore} for the JobManager. + * + * @return ComponentMetricStore for the JobManager + */ + public synchronized ComponentMetricStore getJobManagerMetricStore() { + return ComponentMetricStore.unmodifiable(jobManager); + } + + /** + * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. + * + * @param tmID taskmanager ID + * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists + */ + public synchronized TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { + return tmID == null ? null : TaskManagerMetricStore.unmodifiable(taskManagers.get(tmID)); + } + + /** + * Returns the {@link ComponentMetricStore} for the given job ID. + * + * @param jobID job ID + * @return ComponentMetricStore for the given ID, or null if no store for the given argument exists + */ + public synchronized ComponentMetricStore getJobMetricStore(String jobID) { + return jobID == null ? null : ComponentMetricStore.unmodifiable(jobs.get(jobID)); + } + + /** + * Returns the {@link ComponentMetricStore} for the given job/task ID. + * + * @param jobID job ID + * @param taskID task ID + * @return ComponentMetricStore for given IDs, or null if no store for the given arguments exists + */ + public synchronized ComponentMetricStore getTaskMetricStore(String jobID, String taskID) { + JobMetricStore job = jobID == null ? null : jobs.get(jobID); + if (job == null || taskID == null) { + return null; + } + return ComponentMetricStore.unmodifiable(job.getTaskMetricStore(taskID)); + } + + /** + * Returns the {@link ComponentMetricStore} for the given job/task ID and subtask index. + * + * @param jobID job ID + * @param taskID task ID + * @param subtaskIndex subtask index + * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists + */ + public synchronized ComponentMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { + JobMetricStore job = jobID == null ? null : jobs.get(jobID); + if (job == null) { + return null; + } + TaskMetricStore task = job.getTaskMetricStore(taskID); + if (task == null) { + return null; + } + return ComponentMetricStore.unmodifiable(task.getSubtaskMetricStore(subtaskIndex)); + } + + public synchronized Map<String, JobMetricStore> getJobs() { + return unmodifiableMap(jobs); + } + + public synchronized Map<String, TaskManagerMetricStore> getTaskManagers() { + return unmodifiableMap(taskManagers); + } + + public synchronized ComponentMetricStore getJobManager() { + return ComponentMetricStore.unmodifiable(jobManager); + } + + @VisibleForTesting + void add(MetricDump metric) { try { QueryScopeInfo info = metric.scopeInfo; TaskManagerMetricStore tm; JobMetricStore job; TaskMetricStore task; - SubtaskMetricStore subtask; + ComponentMetricStore subtask; String name = info.scope.isEmpty() ? metric.name @@ -76,11 +188,7 @@ public class MetricStore { break; case INFO_CATEGORY_TM: String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; - tm = taskManagers.get(tmID); - if (tm == null) { - tm = new TaskManagerMetricStore(); - taskManagers.put(tmID, tm); - } + tm = taskManagers.computeIfAbsent(tmID, k -> new TaskManagerMetricStore()); if (name.contains("GarbageCollector")) { String gcName = name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.')); tm.addGarbageCollectorName(gcName); @@ -89,30 +197,14 @@ public class MetricStore { break; case INFO_CATEGORY_JOB: QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; - job = jobs.get(jobInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(jobInfo.jobID, job); - } + job = jobs.computeIfAbsent(jobInfo.jobID, k -> new JobMetricStore()); addMetric(job.metrics, name, metric); break; case INFO_CATEGORY_TASK: QueryScopeInfo.TaskQueryScopeInfo taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info; - job = jobs.get(taskInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(taskInfo.jobID, job); - } - task = job.tasks.get(taskInfo.vertexID); - if (task == null) { - task = new TaskMetricStore(); - job.tasks.put(taskInfo.vertexID, task); - } - subtask = task.subtasks.get(taskInfo.subtaskIndex); - if (subtask == null) { - subtask = new SubtaskMetricStore(); - task.subtasks.put(taskInfo.subtaskIndex, subtask); - } + job = jobs.computeIfAbsent(taskInfo.jobID, k -> new JobMetricStore()); + task = job.tasks.computeIfAbsent(taskInfo.vertexID, k -> new TaskMetricStore()); + subtask = task.subtasks.computeIfAbsent(taskInfo.subtaskIndex, k -> new ComponentMetricStore()); /** * The duplication is intended. Metrics scoped by subtask are useful for several job/task handlers, * while the WebInterface task metric queries currently do not account for subtasks, so we don't @@ -124,16 +216,8 @@ public class MetricStore { break; case INFO_CATEGORY_OPERATOR: QueryScopeInfo.OperatorQueryScopeInfo operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info; - job = jobs.get(operatorInfo.jobID); - if (job == null) { - job = new JobMetricStore(); - jobs.put(operatorInfo.jobID, job); - } - task = job.tasks.get(operatorInfo.vertexID); - if (task == null) { - task = new TaskMetricStore(); - job.tasks.put(operatorInfo.vertexID, task); - } + job = jobs.computeIfAbsent(operatorInfo.jobID, k -> new JobMetricStore()); + task = job.tasks.computeIfAbsent(operatorInfo.vertexID, k -> new TaskMetricStore()); /** * As the WebInterface does not account for operators (because it can't) we don't * divide by operator and instead use the concatenation of subtask index, operator name and metric name @@ -181,74 +265,23 @@ public class MetricStore { } // ----------------------------------------------------------------------------------------------------------------- - // Accessors for sub MetricStores + // sub MetricStore classes // ----------------------------------------------------------------------------------------------------------------- /** - * Returns the {@link JobManagerMetricStore}. - * - * @return JobManagerMetricStore + * Structure containing metrics of a single component. */ - public JobManagerMetricStore getJobManagerMetricStore() { - return jobManager; - } + @ThreadSafe + public static class ComponentMetricStore { + public final Map<String, String> metrics; - /** - * Returns the {@link TaskManagerMetricStore} for the given taskmanager ID. - * - * @param tmID taskmanager ID - * @return TaskManagerMetricStore for the given ID, or null if no store for the given argument exists - */ - public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) { - return taskManagers.get(tmID); - } - - /** - * Returns the {@link JobMetricStore} for the given job ID. - * - * @param jobID job ID - * @return JobMetricStore for the given ID, or null if no store for the given argument exists - */ - public JobMetricStore getJobMetricStore(String jobID) { - return jobs.get(jobID); - } - - /** - * Returns the {@link TaskMetricStore} for the given job/task ID. - * - * @param jobID job ID - * @param taskID task ID - * @return TaskMetricStore for given IDs, or null if no store for the given arguments exists - */ - public TaskMetricStore getTaskMetricStore(String jobID, String taskID) { - JobMetricStore job = getJobMetricStore(jobID); - if (job == null) { - return null; + private ComponentMetricStore() { + this(new ConcurrentHashMap<>()); } - return job.getTaskMetricStore(taskID); - } - /** - * Returns the {@link SubtaskMetricStore} for the given job/task ID and subtask index. - * - * @param jobID job ID - * @param taskID task ID - * @param subtaskIndex subtask index - * @return SubtaskMetricStore for the given IDs and index, or null if no store for the given arguments exists - */ - public SubtaskMetricStore getSubtaskMetricStore(String jobID, String taskID, int subtaskIndex) { - TaskMetricStore task = getTaskMetricStore(jobID, taskID); - if (task == null) { - return null; + private ComponentMetricStore(Map<String, String> metrics) { + this.metrics = checkNotNull(metrics); } - return task.getSubtaskMetricStore(subtaskIndex); - } - - // ----------------------------------------------------------------------------------------------------------------- - // sub MetricStore classes - // ----------------------------------------------------------------------------------------------------------------- - private abstract static class ComponentMetricStore { - public final Map<String, String> metrics = new HashMap<>(); public String getMetric(String name) { return this.metrics.get(name); @@ -260,50 +293,66 @@ public class MetricStore { ? value : defaultValue; } - } - /** - * Sub-structure containing metrics of the JobManager. - */ - public static class JobManagerMetricStore extends ComponentMetricStore { + private static ComponentMetricStore unmodifiable(ComponentMetricStore source) { + if (source == null) { + return null; + } + return new ComponentMetricStore(unmodifiableMap(source.metrics)); + } } /** * Sub-structure containing metrics of a single TaskManager. */ + @ThreadSafe public static class TaskManagerMetricStore extends ComponentMetricStore { - public final Set<String> garbageCollectorNames = new HashSet<>(); + public final Set<String> garbageCollectorNames; + + private TaskManagerMetricStore() { + this(new ConcurrentHashMap<>(), ConcurrentHashMap.newKeySet()); + } + + private TaskManagerMetricStore(Map<String, String> metrics, Set<String> garbageCollectorNames) { + super(metrics); + this.garbageCollectorNames = checkNotNull(garbageCollectorNames); + } - public void addGarbageCollectorName(String name) { + private void addGarbageCollectorName(String name) { garbageCollectorNames.add(name); } + + private static TaskManagerMetricStore unmodifiable(TaskManagerMetricStore source) { + if (source == null) { + return null; + } + return new TaskManagerMetricStore( + unmodifiableMap(source.metrics), + unmodifiableSet(source.garbageCollectorNames)); + } } /** * Sub-structure containing metrics of a single Job. */ - public static class JobMetricStore extends ComponentMetricStore { - private final Map<String, TaskMetricStore> tasks = new HashMap<>(); + @ThreadSafe + private static class JobMetricStore extends ComponentMetricStore { + private final Map<String, TaskMetricStore> tasks = new ConcurrentHashMap<>(); public TaskMetricStore getTaskMetricStore(String taskID) { - return tasks.get(taskID); + return taskID == null ? null : tasks.get(taskID); } } /** * Sub-structure containing metrics of a single Task. */ - public static class TaskMetricStore extends ComponentMetricStore { - private final Map<Integer, SubtaskMetricStore> subtasks = new HashMap<>(); + @ThreadSafe + private static class TaskMetricStore extends ComponentMetricStore { + private final Map<Integer, ComponentMetricStore> subtasks = new ConcurrentHashMap<>(); - public SubtaskMetricStore getSubtaskMetricStore(int subtaskIndex) { + public ComponentMetricStore getSubtaskMetricStore(int subtaskIndex) { return subtasks.get(subtaskIndex); } } - - /** - * Sub-structure containing metrics of a single Subtask. - */ - public static class SubtaskMetricStore extends ComponentMetricStore { - } } http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java index 90bafb7..f0a83b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/TaskManagerMetricsHandler.java @@ -49,7 +49,7 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler { @Override protected Map<String, String> getMapFor(Map<String, String> pathParams, MetricStore metrics) { - MetricStore.TaskManagerMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); + MetricStore.ComponentMetricStore taskManager = metrics.getTaskManagerMetricStore(pathParams.get(TaskManagersHandler.TASK_MANAGER_ID_KEY)); if (taskManager == null) { return null; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java index 2f5a7c8..ee40b99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/MutableIOMetrics.java @@ -77,58 +77,56 @@ public class MutableIOMetrics extends IOMetrics { } else { // execAttempt is still running, use MetricQueryService instead if (fetcher != null) { fetcher.update(); - MetricStore metricStore = fetcher.getMetricStore(); - synchronized (metricStore) { - MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); - if (metrics != null) { - /** - * We want to keep track of missing metrics to be able to make a difference between 0 as a value - * and a missing value. - * In case a metric is missing for a parallel instance of a task, we set the complete flag as - * false. - */ - if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){ - this.numBytesInLocalComplete = false; - } - else { - this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){ - this.numBytesInRemoteComplete = false; - } - else { - this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){ - this.numBytesOutComplete = false; - } - else { - this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){ - this.numRecordsInComplete = false; - } - else { - this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN)); - } - - if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){ - this.numRecordsOutComplete = false; - } - else { - this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT)); - } + MetricStore.ComponentMetricStore metrics = fetcher.getMetricStore() + .getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); + if (metrics != null) { + /** + * We want to keep track of missing metrics to be able to make a difference between 0 as a value + * and a missing value. + * In case a metric is missing for a parallel instance of a task, we set the complete flag as + * false. + */ + if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL) == null){ + this.numBytesInLocalComplete = false; } else { - this.numBytesInLocalComplete = false; + this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE) == null){ this.numBytesInRemoteComplete = false; + } + else { + this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT) == null){ this.numBytesOutComplete = false; + } + else { + this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN) == null){ this.numRecordsInComplete = false; + } + else { + this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN)); + } + + if (metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT) == null){ this.numRecordsOutComplete = false; } + else { + this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT)); + } + } + else { + this.numBytesInLocalComplete = false; + this.numBytesInRemoteComplete = false; + this.numBytesOutComplete = false; + this.numRecordsInComplete = false; + this.numRecordsOutComplete = false; } } } http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java index e513dd9..a6eaf2f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricFetcherTest.java @@ -126,17 +126,17 @@ public class MetricFetcherTest extends TestLogger { fetcher.update(); MetricStore store = fetcher.getMetricStore(); synchronized (store) { - assertEquals("7", store.jobManager.metrics.get("abc.hist_min")); - assertEquals("6", store.jobManager.metrics.get("abc.hist_max")); - assertEquals("4.0", store.jobManager.metrics.get("abc.hist_mean")); - assertEquals("0.5", store.jobManager.metrics.get("abc.hist_median")); - assertEquals("5.0", store.jobManager.metrics.get("abc.hist_stddev")); - assertEquals("0.75", store.jobManager.metrics.get("abc.hist_p75")); - assertEquals("0.9", store.jobManager.metrics.get("abc.hist_p90")); - assertEquals("0.95", store.jobManager.metrics.get("abc.hist_p95")); - assertEquals("0.98", store.jobManager.metrics.get("abc.hist_p98")); - assertEquals("0.99", store.jobManager.metrics.get("abc.hist_p99")); - assertEquals("0.999", store.jobManager.metrics.get("abc.hist_p999")); + assertEquals("7", store.getJobManagerMetricStore().getMetric("abc.hist_min")); + assertEquals("6", store.getJobManagerMetricStore().getMetric("abc.hist_max")); + assertEquals("4.0", store.getJobManagerMetricStore().getMetric("abc.hist_mean")); + assertEquals("0.5", store.getJobManagerMetricStore().getMetric("abc.hist_median")); + assertEquals("5.0", store.getJobManagerMetricStore().getMetric("abc.hist_stddev")); + assertEquals("0.75", store.getJobManagerMetricStore().getMetric("abc.hist_p75")); + assertEquals("0.9", store.getJobManagerMetricStore().getMetric("abc.hist_p90")); + assertEquals("0.95", store.getJobManagerMetricStore().getMetric("abc.hist_p95")); + assertEquals("0.98", store.getJobManagerMetricStore().getMetric("abc.hist_p98")); + assertEquals("0.99", store.getJobManagerMetricStore().getMetric("abc.hist_p99")); + assertEquals("0.999", store.getJobManagerMetricStore().getMetric("abc.hist_p999")); assertEquals("x", store.getTaskManagerMetricStore(tmID.toString()).metrics.get("abc.gauge")); assertEquals("5.0", store.getJobMetricStore(jobID.toString()).metrics.get("abc.jc")); @@ -157,8 +157,8 @@ public class MetricFetcherTest extends TestLogger { c1.inc(1); c2.inc(2); - counters.put(c1, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc")); - counters.put(c2, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc")); + counters.put(c1, new Tuple2<>(new QueryScopeInfo.OperatorQueryScopeInfo(jobID.toString(), "taskid", 2, "opname", "abc"), "oc")); + counters.put(c2, new Tuple2<>(new QueryScopeInfo.TaskQueryScopeInfo(jobID.toString(), "taskid", 2, "abc"), "tc")); meters.put(new Meter() { @Override public void markEvent() { @@ -177,14 +177,14 @@ public class MetricFetcherTest extends TestLogger { public long getCount() { return 10; } - }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc")); + }, new Tuple2<>(new QueryScopeInfo.JobQueryScopeInfo(jobID.toString(), "abc"), "jc")); gauges.put(new Gauge<String>() { @Override public String getValue() { return "x"; } - }, new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); - histograms.put(new TestingHistogram(), new Tuple2<QueryScopeInfo, String>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); + }, new Tuple2<>(new QueryScopeInfo.TaskManagerQueryScopeInfo(tmID.toString(), "abc"), "gauge")); + histograms.put(new TestingHistogram(), new Tuple2<>(new QueryScopeInfo.JobManagerQueryScopeInfo("abc"), "hist")); MetricDumpSerialization.MetricDumpSerializer serializer = new MetricDumpSerialization.MetricDumpSerializer(); MetricDumpSerialization.MetricSerializationResult dump = serializer.serialize(counters, gauges, histograms, meters); http://git-wip-us.apache.org/repos/asf/flink/blob/f622de3e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java index 31225ad..82c6894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStoreTest.java @@ -58,9 +58,9 @@ public class MetricStoreTest extends TestLogger { store.add(cd); //-----verify that no side effects occur - assertEquals(0, store.jobManager.metrics.size()); - assertEquals(0, store.taskManagers.size()); - assertEquals(0, store.jobs.size()); + assertEquals(0, store.getJobManager().metrics.size()); + assertEquals(0, store.getTaskManagers().size()); + assertEquals(0, store.getJobs().size()); } public static MetricStore setupStore(MetricStore store) {
