Repository: flink Updated Branches: refs/heads/release-1.3 80c23de70 -> b896b4b65
[FLINK-7368][metrics] Backport synchronization fix for MetricStore This closes #4841. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b896b4b6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b896b4b6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b896b4b6 Branch: refs/heads/release-1.3 Commit: b896b4b65c58ebcdeb5b318960823b81015c3a0c Parents: 80c23de Author: Piotr Nowojski <piotr.nowoj...@gmail.com> Authored: Mon Oct 16 17:17:26 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Wed Oct 18 15:48:00 2017 +0200 ---------------------------------------------------------------------- .../handlers/TaskManagersHandler.java | 131 ++++++++++--------- .../webmonitor/metrics/MetricFetcher.java | 17 ++- .../webmonitor/utils/MutableIOMetrics.java | 21 +-- 3 files changed, 91 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index a23e983..3782ec9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.webmonitor.handlers; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; @@ -28,15 +27,18 @@ import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.MetricStore; import org.apache.flink.util.StringUtils; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; + +import com.fasterxml.jackson.core.JsonGenerator; import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Map; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + import static java.util.Objects.requireNonNull; public class TaskManagersHandler extends AbstractJsonRequestHandler { @@ -109,67 +111,70 @@ public class TaskManagersHandler extends AbstractJsonRequestHandler { // only send metrics when only one task manager requests them. if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) { fetcher.update(); - MetricStore.TaskManagerMetricStore metrics = fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString()); - if (metrics != null) { - gen.writeObjectFieldStart("metrics"); - long heapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); - long heapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); - long heapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); - - gen.writeNumberField("heapCommitted", heapCommitted); - gen.writeNumberField("heapUsed", heapUsed); - gen.writeNumberField("heapMax", heapTotal); - - long nonHeapUsed = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); - long nonHeapCommitted = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); - long nonHeapTotal = Long.valueOf( metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); - - gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); - gen.writeNumberField("nonHeapUsed", nonHeapUsed); - gen.writeNumberField("nonHeapMax", nonHeapTotal); - - gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); - gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); - gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); - - long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); - long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); - long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); - - gen.writeNumberField("directCount", directCount); - gen.writeNumberField("directUsed", directUsed); - gen.writeNumberField("directMax", directMax); - - long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); - long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); - long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); - - gen.writeNumberField("mappedCount", mappedCount); - gen.writeNumberField("mappedUsed", mappedUsed); - gen.writeNumberField("mappedMax", mappedMax); - - long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); - long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); - - gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); - gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); - - gen.writeArrayFieldStart("garbageCollectors"); - - for (String gcName : metrics.garbageCollectorNames) { - String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); - String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); - if (count != null && time != null) { - gen.writeStartObject(); - gen.writeStringField("name", gcName); - gen.writeNumberField("count", Long.valueOf(count)); - gen.writeNumberField("time", Long.valueOf(time)); - gen.writeEndObject(); + MetricStore metricStore = fetcher.getMetricStore(); + synchronized (metricStore) { + MetricStore.TaskManagerMetricStore metrics = metricStore.getTaskManagerMetricStore(instance.getId().toString()); + if (metrics != null) { + gen.writeObjectFieldStart("metrics"); + long heapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0")); + long heapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0")); + long heapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0")); + + gen.writeNumberField("heapCommitted", heapCommitted); + gen.writeNumberField("heapUsed", heapUsed); + gen.writeNumberField("heapMax", heapTotal); + + long nonHeapUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0")); + long nonHeapCommitted = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0")); + long nonHeapTotal = Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0")); + + gen.writeNumberField("nonHeapCommitted", nonHeapCommitted); + gen.writeNumberField("nonHeapUsed", nonHeapUsed); + gen.writeNumberField("nonHeapMax", nonHeapTotal); + + gen.writeNumberField("totalCommitted", heapCommitted + nonHeapCommitted); + gen.writeNumberField("totalUsed", heapUsed + nonHeapUsed); + gen.writeNumberField("totalMax", heapTotal + nonHeapTotal); + + long directCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0")); + long directUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0")); + long directMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0")); + + gen.writeNumberField("directCount", directCount); + gen.writeNumberField("directUsed", directUsed); + gen.writeNumberField("directMax", directMax); + + long mappedCount = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0")); + long mappedUsed = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0")); + long mappedMax = Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0")); + + gen.writeNumberField("mappedCount", mappedCount); + gen.writeNumberField("mappedUsed", mappedUsed); + gen.writeNumberField("mappedMax", mappedMax); + + long memorySegmentsAvailable = Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0")); + long memorySegmentsTotal = Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0")); + + gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable); + gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal); + + gen.writeArrayFieldStart("garbageCollectors"); + + for (String gcName : metrics.garbageCollectorNames) { + String count = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null); + String time = metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null); + if (count != null && time != null) { + gen.writeStartObject(); + gen.writeStringField("name", gcName); + gen.writeNumberField("count", Long.valueOf(count)); + gen.writeNumberField("time", Long.valueOf(time)); + gen.writeEndObject(); + } } - } - gen.writeEndArray(); - gen.writeEndObject(); + gen.writeEndArray(); + gen.writeEndObject(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java index 4f92148..667cc22 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java @@ -30,23 +30,24 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization; import org.apache.flink.runtime.metrics.dump.MetricQueryService; -import org.apache.flink.runtime.metrics.dump.MetricDump; import org.apache.flink.runtime.webmonitor.JobManagerRetriever; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + import scala.Option; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - import static org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer; /** @@ -194,8 +195,10 @@ public class MetricFetcher { private void addMetrics(Object result) { MetricDumpSerialization.MetricSerializationResult data = (MetricDumpSerialization.MetricSerializationResult) result; List<MetricDump> dumpedMetrics = deserializer.deserialize(data); - for (MetricDump metric : dumpedMetrics) { - metrics.add(metric); + synchronized (metrics) { + for (MetricDump metric : dumpedMetrics) { + metrics.add(metric); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/b896b4b6/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java index 32cda7f..e2d9bef 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java @@ -17,7 +17,6 @@ */ package org.apache.flink.runtime.webmonitor.utils; -import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.IOMetrics; @@ -26,7 +25,10 @@ import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.metrics.MetricStore; +import com.fasterxml.jackson.core.JsonGenerator; + import javax.annotation.Nullable; + import java.io.IOException; /** @@ -69,13 +71,16 @@ public class MutableIOMetrics extends IOMetrics { } else { // execAttempt is still running, use MetricQueryService instead if (fetcher != null) { fetcher.update(); - MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); - if (metrics != null) { - this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")); - this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); - this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); - this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); - this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); + MetricStore metricStore = fetcher.getMetricStore(); + synchronized (metricStore) { + MetricStore.SubtaskMetricStore metrics = metricStore.getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); + if (metrics != null) { + this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")); + this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); + } } } }