[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16652317#comment-16652317 ] ASF GitHub Bot commented on FLINK-4840: --- zentol closed pull request #2753: [FLINK-4840] [metrics] Measure latency of record processing and expose it as a metric URL: https://github.com/apache/flink/pull/2753 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 0a294fa65a1..8d86158065f 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -134,6 +134,12 @@ under the License. ${jackson.version} + + io.dropwizard.metrics + metrics-core + ${metrics.version} + + org.apache.zookeeper zookeeper diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index c5296fbd063..5d06e2c2251 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -18,16 +18,21 @@ package org.apache.flink.runtime.metrics.groups; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.executiongraph.IOMetrics; +import com.codahale.metrics.Reservoir; +import com.codahale.metrics.SlidingWindowReservoir; +import com.codahale.metrics.Snapshot; import java.util.ArrayList; import java.util.List; @@ -50,6 +55,8 @@ private final Meter numRecordsInRate; private final Meter numRecordsOutRate; + private final LatencyHistogram recordProcessLatency; + public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); @@ -63,6 +70,10 @@ public TaskIOMetricGroup(TaskMetricGroup parent) { this.numRecordsOut = counter("numRecordsOut", new SumCounter()); this.numRecordsInRate = meter("numRecordsInPerSecond", new MeterView(numRecordsIn, 60)); this.numRecordsOutRate = meter("numRecordsOutPerSecond", new MeterView(numRecordsOut, 60)); + this.recordProcessLatency = histogram("recordProcessLatency", new LatencyHistogram(true)); + if (recordProcessLatency.getLatencyAccumulateCounter() != null) { + meter("recordProcTimeProportion", new MeterView(recordProcessLatency.getLatencyAccumulateCounter(), 60)); + } } public IOMetrics createSnapshot() { @@ -104,6 +115,10 @@ public Meter getNumBytesOutRateMeter() { return numBytesOutRate; } + public Histogram getRecordProcessLatency() { + return recordProcessLatency; + } + // // Buffer metrics // @@ -257,4 +272,134 @@ public long getCount() { return sum; } } + + // + // Latency metrics + // + + /** +* Histogram measuring the record processing latency of a task. +* It's element processing time of a task. But an element emitting time for Source Task. +* It could be given a history size or a Reservoir when construct. +* A latency accumulate will be activated if accumulate enabled +*/ + private static class LatencyHistogram implements Histogram { + + private static final int DEFAULT_HISTORY_SIZE = 128; + + // conversion of millisecond and nanoseco
[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954938#comment-15954938 ] Chesnay Schepler commented on FLINK-4840: - I may have found a suitable implementation alternative: The key problem in the existing approach is that it calculates the time taken for every invocation of the method, which is just to expensive since this requires 2 time measurements (which should also use nanoTime which is even more expensive), as well as using a histogram. My idea would be to * no longer create a histogram since this can be done easily outside of Flink and only provide raw time measurements * not measure the time for every call, but instead only a fixed number of times over a period of time. We already have all tools that we require for this, the View interface. We can generalize the details in a new Timer interface: {code} public interface Timer implements Metric { void start(); void end(); long getTime(); // last measure time } {code} The following TimerView implementation relies on the View interface to be regularly (every 5 seconds) enabled using the update() method. If the TimerView is not enabled start() and stop() are no-ops. If it is enabled it will take a single measurement. The implementation could look like this: {code} public class TimerView implements Timer, View { private boolean enabled = false; private long startTime = 0; private long lastMeasurement = -1; public void update() { enabled = true; } public void start() { if (enabled) { startTime = System.nanoTime(); } } public void stop() { if (enabled) { lastMeasurement = System.nanoTime() - startTime; // convert to millis or smth enabled = false; } } public long getTime() { return lastMeasurement; } } {code} I quickly threw this together so here are of course some details missing, like what happens when stop() is never called and such. But the general approach seems reasonable to me; tell me what you think. > Measure latency of record processing and expose it as a metric > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > > We should expose the following Metrics on the TaskIOMetricGroup: > 1. recordProcessLatency(ms): Histogram measuring the processing time per > record of a task. It is the processing time of chain if a chained task. > 2. recordProcTimeProportion(ms): Meter measuring the proportion of record > processing time for infor whether the main cost -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947759#comment-15947759 ] ASF GitHub Bot commented on FLINK-4840: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2753 A gentle ping about how to proceed here... Are you interested in pursuing another implementation approach, or should we close this as "fix later"? > Measure latency of record processing and expose it as a metric > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > > We should expose the following Metrics on the TaskIOMetricGroup: > 1. recordProcessLatency(ms): Histogram measuring the processing time per > record of a task. It is the processing time of chain if a chained task. > 2. recordProcTimeProportion(ms): Meter measuring the proportion of record > processing time for infor whether the main cost -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636212#comment-15636212 ] ASF GitHub Bot commented on FLINK-4840: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2753 I think we need a different way to solve this. This pull request adds a very high overhead to the processing of each record: - two calls to `System.nanoTime()` - Maintining a Dropwizard Histogram Without having benchmarked this, I would expect this to drop the performance for typical operations like filters or lightweight map functions by a large degree. Flink is building a streaming runtime that is performance competitive with a batch runtime, so the base runtime overhead per record needs to be minimal. All metrics so far have been designed with that paradigm in mind: Metrics may not add any cost to the processing. - Metrics are gathered by asynchronous threads - The core uses only non-synchronized counters and gauges because they come quasi for free - We consciously decided to not use in the data paths any metric type that has the overhead of creating objects of maintaining a data structure. I would suggest to first have a design discussion about whether we want to measure this and how we can do it for free. For example, have a look at the "end to end" latency measurements #2386 via latency markers, for an idea how to measure with minimal impact on the data processing. > Measure latency of record processing and expose it as a metric > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > Fix For: 1.2.0 > > > We should expose the following Metrics on the TaskIOMetricGroup: > 1. recordProcessLatency(ms): Histogram measuring the processing time per > record of a task. It is the processing time of chain if a chained task. > 2. recordProcTimeProportion(ms): Meter measuring the proportion of record > processing time for infor whether the main cost -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635541#comment-15635541 ] ASF GitHub Bot commented on FLINK-4840: --- GitHub user zhuhaifengleon opened a pull request: https://github.com/apache/flink/pull/2753 [FLINK-4840] [metrics] Measure latency of record processing and expose it as a metric This PR introduce record processing time metric for measuring a task running performance. The latency is processing time cost of all chained operator for a task. following Metrics on the TaskIOMetricGroup: 1. recordProcessLatency(ms): Histogram measuring the processing time per record of a task. It is the processing time of chain if a chained task. 2. recordProcTimeProportion(ms): Meter measuring the proportion of record processing time for infor whether the main cost You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhuhaifengleon/flink FLINK-4840 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2753.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2753 commit 780d5590ff90ff6cda703b5747db01fc3bf968b9 Author: zhuhaifengleon Date: 2016-10-31T09:41:39Z [FLINK-4840] [metrics] Measure latency/delay of record processing and expose it as a metric commit 0fbb87be7ec9b86edb0888203d00aec595e77b31 Author: zhuhaifengleon Date: 2016-11-03T08:26:29Z [FLINK-4840] [metrics] Measure latency/delay of record processing and expose it as a metric > Measure latency of record processing and expose it as a metric > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > Fix For: 1.2.0 > > > We should expose the following Metrics on the TaskIOMetricGroup: > 1. recordProcessLatency(ms): Histogram measuring the processing time per > record of a task. It is the processing time of chain if a chained task. > 2. recordProcTimeProportion(ms): Meter measuring the proportion of record > processing time for infor whether the main cost -- This message was sent by Atlassian JIRA (v6.3.4#6332)