Repository: flink
Updated Branches:
  refs/heads/master 4f8d01fba -> 742e4a0ff


[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler

This closes #4811.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/742e4a0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/742e4a0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/742e4a0f

Branch: refs/heads/master
Commit: 742e4a0ffd2fb244654e97098d0b23100789d4e9
Parents: 4f8d01f
Author: Till Rohrmann <[email protected]>
Authored: Wed Oct 11 19:00:16 2017 +0200
Committer: Till <[email protected]>
Committed: Thu Oct 12 16:34:37 2017 +0200

----------------------------------------------------------------------
 .../handler/legacy/TaskManagersHandler.java     | 122 ++++++++++---------
 1 file changed, 63 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/742e4a0f/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
index 0880d0c..e608b99 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -130,67 +130,71 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                        // only send metrics when only one task manager 
requests them.
                        if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
                                fetcher.update();
-                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-                               if (metrics != null) {
-                                       gen.writeObjectFieldStart("metrics");
-                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-                                       gen.writeNumberField("heapCommitted", 
heapCommitted);
-                                       gen.writeNumberField("heapUsed", 
heapUsed);
-                                       gen.writeNumberField("heapMax", 
heapTotal);
-
-                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-                                       gen.writeNumberField("nonHeapUsed", 
nonHeapUsed);
-                                       gen.writeNumberField("nonHeapMax", 
nonHeapTotal);
-
-                                       gen.writeNumberField("totalCommitted", 
heapCommitted + nonHeapCommitted);
-                                       gen.writeNumberField("totalUsed", 
heapUsed + nonHeapUsed);
-                                       gen.writeNumberField("totalMax", 
heapTotal + nonHeapTotal);
-
-                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-                                       gen.writeNumberField("directCount", 
directCount);
-                                       gen.writeNumberField("directUsed", 
directUsed);
-                                       gen.writeNumberField("directMax", 
directMax);
-
-                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-                                       gen.writeNumberField("mappedCount", 
mappedCount);
-                                       gen.writeNumberField("mappedUsed", 
mappedUsed);
-                                       gen.writeNumberField("mappedMax", 
mappedMax);
-
-                                       long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-                                       long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-                                       
gen.writeArrayFieldStart("garbageCollectors");
-
-                                       for (String gcName : 
metrics.garbageCollectorNames) {
-                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-                                               if (count != null  && time != 
null) {
-                                                       gen.writeStartObject();
-                                                       
gen.writeStringField("name", gcName);
-                                                       
gen.writeNumberField("count", Long.valueOf(count));
-                                                       
gen.writeNumberField("time", Long.valueOf(time));
-                                                       gen.writeEndObject();
+                               final MetricStore metricStore = 
fetcher.getMetricStore();
+
+                               synchronized (metricStore) {
+                                       MetricStore.TaskManagerMetricStore 
metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString());
+                                       if (metrics != null) {
+                                               
gen.writeObjectFieldStart("metrics");
+                                               long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+                                               long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+                                               long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+                                               
gen.writeNumberField("heapCommitted", heapCommitted);
+                                               
gen.writeNumberField("heapUsed", heapUsed);
+                                               gen.writeNumberField("heapMax", 
heapTotal);
+
+                                               long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+                                               long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+                                               long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+                                               
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+                                               
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+                                               
gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+                                               
gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+                                               
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+                                               
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+                                               long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+                                               long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+                                               long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+                                               
gen.writeNumberField("directCount", directCount);
+                                               
gen.writeNumberField("directUsed", directUsed);
+                                               
gen.writeNumberField("directMax", directMax);
+
+                                               long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+                                               long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+                                               long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+                                               
gen.writeNumberField("mappedCount", mappedCount);
+                                               
gen.writeNumberField("mappedUsed", mappedUsed);
+                                               
gen.writeNumberField("mappedMax", mappedMax);
+
+                                               long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+                                               long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+                                               
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+                                               
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+                                               
gen.writeArrayFieldStart("garbageCollectors");
+
+                                               for (String gcName : 
metrics.garbageCollectorNames) {
+                                                       String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+                                                       String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+                                                       if (count != null && 
time != null) {
+                                                               
gen.writeStartObject();
+                                                               
gen.writeStringField("name", gcName);
+                                                               
gen.writeNumberField("count", Long.valueOf(count));
+                                                               
gen.writeNumberField("time", Long.valueOf(time));
+                                                               
gen.writeEndObject();
+                                                       }
                                                }
-                                       }
 
-                                       gen.writeEndArray();
-                                       gen.writeEndObject();
+                                               gen.writeEndArray();
+                                               gen.writeEndObject();
+                                       }
                                }
                        }
 

Reply via email to