[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16652317#comment-16652317
 ] 

ASF GitHub Bot commented on FLINK-4840:
---

zentol closed pull request #2753: [FLINK-4840] [metrics] Measure latency of 
record processing and expose it as a metric
URL: https://github.com/apache/flink/pull/2753
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 0a294fa65a1..8d86158065f 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -134,6 +134,12 @@ under the License.
${jackson.version}

 
+   
+   io.dropwizard.metrics
+   metrics-core
+   ${metrics.version}
+   
+

org.apache.zookeeper
zookeeper
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
index c5296fbd063..5d06e2c2251 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -18,16 +18,21 @@
 
 package org.apache.flink.runtime.metrics.groups;
 
-import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.SlidingWindowReservoir;
+import com.codahale.metrics.Snapshot;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,6 +55,8 @@
private final Meter numRecordsInRate;
private final Meter numRecordsOutRate;
 
+   private final LatencyHistogram recordProcessLatency;
+
public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent);
 
@@ -63,6 +70,10 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {
this.numRecordsOut = counter("numRecordsOut", new SumCounter());
this.numRecordsInRate = meter("numRecordsInPerSecond", new 
MeterView(numRecordsIn, 60));
this.numRecordsOutRate = meter("numRecordsOutPerSecond", new 
MeterView(numRecordsOut, 60));
+   this.recordProcessLatency = histogram("recordProcessLatency", 
new LatencyHistogram(true));
+   if (recordProcessLatency.getLatencyAccumulateCounter() != null) 
{
+   meter("recordProcTimeProportion", new 
MeterView(recordProcessLatency.getLatencyAccumulateCounter(), 60));
+   }
}
 
public IOMetrics createSnapshot() {
@@ -104,6 +115,10 @@ public Meter getNumBytesOutRateMeter() {
return numBytesOutRate;
}
 
+   public Histogram getRecordProcessLatency() {
+   return recordProcessLatency;
+   }
+
// 

// Buffer metrics
// 

@@ -257,4 +272,134 @@ public long getCount() {
return sum;
}
}
+
+   // 

+   // Latency metrics
+   // 

+
+   /**
+* Histogram measuring the record processing latency of a task.
+* It's element processing time of a task. But an element emitting time 
for Source Task.
+* It could be given a history size or a Reservoir when construct.
+* A latency accumulate will be activated if accumulate enabled
+*/
+   private static class LatencyHistogram implements Histogram {
+
+   private static final int DEFAULT_HISTORY_SIZE = 128;
+
+   // conversion of millisecond and nanoseco

[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric

2017-04-04 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954938#comment-15954938
 ] 

Chesnay Schepler commented on FLINK-4840:
-

I may have found a suitable implementation alternative:

The key problem in the existing approach is that it calculates the time taken 
for every invocation of the method, which is just to expensive since this 
requires 2 time measurements (which should also use nanoTime which is even more 
expensive), as well as using a histogram.

My idea would be to
* no longer create a histogram since this can be done easily outside of Flink 
and only provide raw time measurements
* not measure the time for every call, but instead only a fixed number of times 
over a period of time. We already have all tools that we require for this, the 
View interface.

We can generalize the details in a new Timer interface:
{code}
public interface Timer implements Metric {
void start();
void end();
long getTime(); // last measure time
}
{code}

The following TimerView implementation relies on the View interface to be 
regularly (every 5 seconds) enabled using the update() method.
If the TimerView is not enabled start() and stop() are no-ops. If it is enabled 
it will take a single measurement.

The implementation could look like this:
{code}
public class TimerView implements Timer, View {
private boolean enabled = false;
private long startTime = 0;
private long lastMeasurement = -1;

public void update() {
enabled = true;
}

public void start() {
if (enabled) {
startTime = System.nanoTime();
}
}

public void stop() {
if (enabled) {
lastMeasurement = System.nanoTime() - startTime; // 
convert to millis or smth
enabled = false;
}
}

public long getTime() {
return lastMeasurement;
}
}
{code}

I quickly threw this together so here are of course some details missing, like 
what happens when stop() is never called and such.

But the general approach seems reasonable to me; tell me what you think.

> Measure latency of record processing and expose it as a metric
> --
>
> Key: FLINK-4840
> URL: https://issues.apache.org/jira/browse/FLINK-4840
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
>
> We should expose the following Metrics on the TaskIOMetricGroup:
> 1. recordProcessLatency(ms): Histogram measuring the processing time per 
> record of a task. It is the processing time of chain if a chained task.  
> 2. recordProcTimeProportion(ms): Meter measuring the proportion of record 
> processing time for infor whether the main cost



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric

2017-03-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947759#comment-15947759
 ] 

ASF GitHub Bot commented on FLINK-4840:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2753
  
A gentle ping about how to proceed here...
Are you interested in pursuing another implementation approach, or should 
we close this as "fix later"?




> Measure latency of record processing and expose it as a metric
> --
>
> Key: FLINK-4840
> URL: https://issues.apache.org/jira/browse/FLINK-4840
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
>
> We should expose the following Metrics on the TaskIOMetricGroup:
> 1. recordProcessLatency(ms): Histogram measuring the processing time per 
> record of a task. It is the processing time of chain if a chained task.  
> 2. recordProcTimeProportion(ms): Meter measuring the proportion of record 
> processing time for infor whether the main cost



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636212#comment-15636212
 ] 

ASF GitHub Bot commented on FLINK-4840:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2753
  
I think we need a different way to solve this.

This pull request adds a very high overhead to the processing of each 
record:
  - two calls to `System.nanoTime()`
  - Maintining a Dropwizard Histogram

Without having benchmarked this, I would expect this to drop the 
performance for typical operations like filters or lightweight map functions by 
a large degree.

Flink is building a streaming runtime that is performance competitive with 
a batch runtime, so the base runtime overhead per record needs to be minimal.

All metrics so far have been designed with that paradigm in mind: Metrics 
may not add any cost to the processing.
  - Metrics are gathered by asynchronous threads
  - The core uses only non-synchronized counters and gauges because they 
come quasi for free
  - We consciously decided to not use in the data paths any metric type 
that has the overhead of creating objects of maintaining a data structure.

I would suggest to first have a design discussion about whether we want to 
measure this and how we can do it for free.
For example, have a look at the "end to end" latency measurements #2386 via 
latency markers, for an idea how to measure with minimal impact on the data 
processing.


> Measure latency of record processing and expose it as a metric
> --
>
> Key: FLINK-4840
> URL: https://issues.apache.org/jira/browse/FLINK-4840
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
> 1. recordProcessLatency(ms): Histogram measuring the processing time per 
> record of a task. It is the processing time of chain if a chained task.  
> 2. recordProcTimeProportion(ms): Meter measuring the proportion of record 
> processing time for infor whether the main cost



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric

2016-11-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635541#comment-15635541
 ] 

ASF GitHub Bot commented on FLINK-4840:
---

GitHub user zhuhaifengleon opened a pull request:

https://github.com/apache/flink/pull/2753

[FLINK-4840] [metrics] Measure latency of record processing and expose it 
as a metric

This PR introduce record processing time metric for measuring a task 
running performance. The latency is processing time cost of all chained 
operator for a task.
following Metrics on the TaskIOMetricGroup:
1. recordProcessLatency(ms): Histogram measuring the processing time per 
record of a task. It is the processing time of chain if a chained task. 
2. recordProcTimeProportion(ms): Meter measuring the proportion of record 
processing time for infor whether the main cost

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhuhaifengleon/flink FLINK-4840

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2753.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2753


commit 780d5590ff90ff6cda703b5747db01fc3bf968b9
Author: zhuhaifengleon 
Date:   2016-10-31T09:41:39Z

[FLINK-4840] [metrics] Measure latency/delay of record processing and 
expose it as a metric

commit 0fbb87be7ec9b86edb0888203d00aec595e77b31
Author: zhuhaifengleon 
Date:   2016-11-03T08:26:29Z

[FLINK-4840] [metrics] Measure latency/delay of record processing and 
expose it as a metric




> Measure latency of record processing and expose it as a metric
> --
>
> Key: FLINK-4840
> URL: https://issues.apache.org/jira/browse/FLINK-4840
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> We should expose the following Metrics on the TaskIOMetricGroup:
> 1. recordProcessLatency(ms): Histogram measuring the processing time per 
> record of a task. It is the processing time of chain if a chained task.  
> 2. recordProcTimeProportion(ms): Meter measuring the proportion of record 
> processing time for infor whether the main cost



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)