Repository: flink
Updated Branches:
  refs/heads/release-1.3 80c23de70 -> b896b4b65


[FLINK-7368][metrics] Backport synchronization fix for MetricStore

This closes #4841.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b896b4b6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b896b4b6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b896b4b6

Branch: refs/heads/release-1.3
Commit: b896b4b65c58ebcdeb5b318960823b81015c3a0c
Parents: 80c23de
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Mon Oct 16 17:17:26 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Wed Oct 18 15:48:00 2017 +0200

----------------------------------------------------------------------
 .../handlers/TaskManagersHandler.java           | 131 ++++++++++---------
 .../webmonitor/metrics/MetricFetcher.java       |  17 ++-
 .../webmonitor/utils/MutableIOMetrics.java      |  21 +--
 3 files changed, 91 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index a23e983..3782ec9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -28,15 +27,18 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 import org.apache.flink.util.StringUtils;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
+
+import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
 import static java.util.Objects.requireNonNull;
 
 public class TaskManagersHandler extends AbstractJsonRequestHandler  {
@@ -109,67 +111,70 @@ public class TaskManagersHandler extends 
AbstractJsonRequestHandler  {
                                        // only send metrics when only one task 
manager requests them.
                                        if 
(pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
                                                fetcher.update();
-                                               
MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-                                               if (metrics != null) {
-                                                       
gen.writeObjectFieldStart("metrics");
-                                                       long heapUsed = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-                                                       long heapCommitted = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-                                                       long heapTotal = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-                                                       
gen.writeNumberField("heapCommitted", heapCommitted);
-                                                       
gen.writeNumberField("heapUsed", heapUsed);
-                                                       
gen.writeNumberField("heapMax", heapTotal);
-
-                                                       long nonHeapUsed = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-                                                       long nonHeapCommitted = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-                                                       long nonHeapTotal = 
Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-                                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-                                                       
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
-                                                       
gen.writeNumberField("nonHeapMax", nonHeapTotal);
-
-                                                       
gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
-                                                       
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
-                                                       
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
-
-                                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-                                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-                                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-                                                       
gen.writeNumberField("directCount", directCount);
-                                                       
gen.writeNumberField("directUsed", directUsed);
-                                                       
gen.writeNumberField("directMax", directMax);
-
-                                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-                                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-                                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-                                                       
gen.writeNumberField("mappedCount", mappedCount);
-                                                       
gen.writeNumberField("mappedUsed", mappedUsed);
-                                                       
gen.writeNumberField("mappedMax", mappedMax);
-
-                                                       long 
memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-                                                       long 
memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-                                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-                                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-                                                       
gen.writeArrayFieldStart("garbageCollectors");
-
-                                                       for (String gcName : 
metrics.garbageCollectorNames) {
-                                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-                                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-                                                               if (count != 
null  && time != null) {
-                                                                       
gen.writeStartObject();
-                                                                       
gen.writeStringField("name", gcName);
-                                                                       
gen.writeNumberField("count", Long.valueOf(count));
-                                                                       
gen.writeNumberField("time", Long.valueOf(time));
-                                                                       
gen.writeEndObject();
+                                               MetricStore metricStore = 
fetcher.getMetricStore();
+                                               synchronized (metricStore) {
+                                                       
MetricStore.TaskManagerMetricStore metrics = 
metricStore.getTaskManagerMetricStore(instance.getId().toString());
+                                                       if (metrics != null) {
+                                                               
gen.writeObjectFieldStart("metrics");
+                                                               long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+                                                               long 
heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+                                                               long heapTotal 
= Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+                                                               
gen.writeNumberField("heapCommitted", heapCommitted);
+                                                               
gen.writeNumberField("heapUsed", heapUsed);
+                                                               
gen.writeNumberField("heapMax", heapTotal);
+
+                                                               long 
nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", 
"0"));
+                                                               long 
nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+                                                               long 
nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", 
"0"));
+
+                                                               
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+                                                               
gen.writeNumberField("nonHeapUsed", nonHeapUsed);
+                                                               
gen.writeNumberField("nonHeapMax", nonHeapTotal);
+
+                                                               
gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted);
+                                                               
gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed);
+                                                               
gen.writeNumberField("totalMax", heapTotal + nonHeapTotal);
+
+                                                               long 
directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", 
"0"));
+                                                               long directUsed 
= Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+                                                               long directMax 
= Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", 
"0"));
+
+                                                               
gen.writeNumberField("directCount", directCount);
+                                                               
gen.writeNumberField("directUsed", directUsed);
+                                                               
gen.writeNumberField("directMax", directMax);
+
+                                                               long 
mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", 
"0"));
+                                                               long mappedUsed 
= Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+                                                               long mappedMax 
= Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", 
"0"));
+
+                                                               
gen.writeNumberField("mappedCount", mappedCount);
+                                                               
gen.writeNumberField("mappedUsed", mappedUsed);
+                                                               
gen.writeNumberField("mappedMax", mappedMax);
+
+                                                               long 
memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+                                                               long 
memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+                                                               
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+                                                               
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+                                                               
gen.writeArrayFieldStart("garbageCollectors");
+
+                                                               for (String 
gcName : metrics.garbageCollectorNames) {
+                                                                       String 
count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", 
null);
+                                                                       String 
time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", 
null);
+                                                                       if 
(count != null && time != null) {
+                                                                               
gen.writeStartObject();
+                                                                               
gen.writeStringField("name", gcName);
+                                                                               
gen.writeNumberField("count", Long.valueOf(count));
+                                                                               
gen.writeNumberField("time", Long.valueOf(time));
+                                                                               
gen.writeEndObject();
+                                                                       }
                                                                }
-                                                       }
 
-                                                       gen.writeEndArray();
-                                                       gen.writeEndObject();
+                                                               
gen.writeEndArray();
+                                                               
gen.writeEndObject();
+                                                       }
                                                }
                                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
index 4f92148..667cc22 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
@@ -30,23 +30,24 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import scala.Option;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
 
 /**
@@ -194,8 +195,10 @@ public class MetricFetcher {
        private void addMetrics(Object result) {
                MetricDumpSerialization.MetricSerializationResult data = 
(MetricDumpSerialization.MetricSerializationResult) result;
                List<MetricDump> dumpedMetrics = deserializer.deserialize(data);
-               for (MetricDump metric : dumpedMetrics) {
-                       metrics.add(metric);
+               synchronized (metrics) {
+                       for (MetricDump metric : dumpedMetrics) {
+                               metrics.add(metric);
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
index 32cda7f..e2d9bef 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.runtime.webmonitor.utils;
 
-import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
@@ -26,7 +25,10 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 /**
@@ -69,13 +71,16 @@ public class MutableIOMetrics extends IOMetrics {
                } else { // execAttempt is still running, use 
MetricQueryService instead
                        if (fetcher != null) {
                                fetcher.update();
-                               MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
-                               if (metrics != null) {
-                                       this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
-                                       this.numBytesInRemote += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-                                       this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-                                       this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-                                       this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+                               MetricStore metricStore = 
fetcher.getMetricStore();
+                               synchronized (metricStore) {
+                                       MetricStore.SubtaskMetricStore metrics 
= metricStore.getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
+                                       if (metrics != null) {
+                                               this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+                                               this.numBytesInRemote += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                                               this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                                               this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                                               this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+                                       }
                                }
                        }
                }

Reply via email to