Repository: flink Updated Branches: refs/heads/master 4f8d01fba -> 742e4a0ff
[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler This closes #4811. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/742e4a0f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/742e4a0f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/742e4a0f Branch: refs/heads/master Commit: 742e4a0ffd2fb244654e97098d0b23100789d4e9 Parents: 4f8d01f Author: Till Rohrmann <[email protected]> Authored: Wed Oct 11 19:00:16 2017 +0200 Committer: Till <[email protected]> Committed: Thu Oct 12 16:34:37 2017 +0200 ---------------------------------------------------------------------- .../handler/legacy/TaskManagersHandler.java | 122 ++++++++++--------- 1 file changed, 63 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/742e4a0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java index 0880d0c..e608b99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java @@ -130,67 +130,71 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { // only send metrics when only one task manager requests them. if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { fetcher.update(); - MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); - if (metrics != null) { - gen.writeObjectFieldStart("metrics"); - long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); - long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); - long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); - - gen.writeNumberField("heapCommitted", heapCommitted); - gen.writeNumberField("heapUsed", heapUsed); - gen.writeNumberField("heapMax", heapTotal); - - long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); - long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); - long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); - - gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); - gen.writeNumberField("nonHeapUsed", nonHeapUsed); - gen.writeNumberField("nonHeapMax", nonHeapTotal); - - gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); - gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); - gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); - - long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); - long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); - long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); - - gen.writeNumberField("directCount", directCount); - gen.writeNumberField("directUsed", directUsed); - gen.writeNumberField("directMax", directMax); - - long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); - long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); - long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); - - gen.writeNumberField("mappedCount", mappedCount); - gen.writeNumberField("mappedUsed", mappedUsed); - gen.writeNumberField("mappedMax", mappedMax); - - long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); - long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); - - gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); - gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); - - gen.writeArrayFieldStart("garbageCollectors"); - - for (String gcName : metrics.garbageCollectorNames) { - String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); - String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); - if (count != null && time != null) { - gen.writeStartObject(); - gen.writeStringField("name", gcName); - gen.writeNumberField("count", Long.valueOf(count)); - gen.writeNumberField("time", Long.valueOf(time)); - gen.writeEndObject(); + final MetricStore metricStore = fetcher.getMetricStore(); + + synchronized (metricStore) { + MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString()); + if (metrics != null) { + gen.writeObjectFieldStart("metrics"); + long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + + gen.writeNumberField("heapCommitted", heapCommitted); + gen.writeNumberField("heapUsed", heapUsed); + gen.writeNumberField("heapMax", heapTotal); + + long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + + gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); + gen.writeNumberField("nonHeapUsed", nonHeapUsed); + gen.writeNumberField("nonHeapMax", nonHeapTotal); + + gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); + gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); + gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); + + long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); + long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); + long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); + + gen.writeNumberField("directCount", directCount); + gen.writeNumberField("directUsed", directUsed); + gen.writeNumberField("directMax", directMax); + + long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); + long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); + long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); + + gen.writeNumberField("mappedCount", mappedCount); + gen.writeNumberField("mappedUsed", mappedUsed); + gen.writeNumberField("mappedMax", mappedMax); + + long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); + long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); + + gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); + gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); + + gen.writeArrayFieldStart("garbageCollectors"); + + for (String gcName : metrics.garbageCollectorNames) { + String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); + String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); + if (count != null && time != null) { + gen.writeStartObject(); + gen.writeStringField("name", gcName); + gen.writeNumberField("count", Long.valueOf(count)); + gen.writeNumberField("time", Long.valueOf(time)); + gen.writeEndObject(); + } } - } - gen.writeEndArray(); - gen.writeEndObject(); + gen.writeEndArray(); + gen.writeEndObject(); + } } }
