[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-10-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4665


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-23 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140645413
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
@@ -231,6 +231,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

I see. Data being side outputted isn't considered being dropped. 

LGTM


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-23 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140635202
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
@@ -231,6 +231,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

i think it should write in  this way  

```
  if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.lostDataCount.inc();
}
}
```


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-23 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140634868
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

it looks better this way, i will adjust here  :)


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140564400
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 ---
@@ -231,6 +231,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

the if-else conditions are duplicated and inefficient, and can be further 
combined as

if(isSkippedElement && isElementLate(element)) {
if(lateDataOutputTag != null) {
sideOutput(element);
}
this.lostDataCount.inc();
}


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-22 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r140564281
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
--- End diff --

the if-else conditions are duplicated and inefficient, and can be further 
combined as

```java
if(isSkippedElement && isElementLate(element)) {
if(lateDataOutputTag != null) {
sideOutput(element);
}
this.lostDataCount.inc();
}
```


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-20 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139933981
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

haha, i know your mean now, i think its interesting ^ _ ^.  i push the code 
again , and modify the metrics name according to @zentol , please review the 
code again @aljoscha 


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139911220
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

Or we can just check if `isElementLate(element)`, that's what it is there 
for. 😉 


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-19 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139742368
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

i think when the `isSkippedElement` is true,  the `isElementLate(element)` 
is always be true. Because `isSkippedElement` is true when all the assigned 
window's window.endtime + allowLateness < currentLowWatermark, and 
`isElementLate` is true when element.time + allowLateness < 
currentLowWatermark. and element.time is <= bigest window.endtime.  so does 
`isElementLate` always be true when isSkippedElement is true?  And i think if i 
want to rule  out the situation that **because no windows were assigned to 
it.**, i just need to judge  whether the variable `Collection elementWindows` 
is empty?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r139702816
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -405,6 +411,8 @@ public void merge(W mergeResult,
// windowAssigner is event time and current timestamp + allowed 
lateness no less than element timestamp
if (isSkippedElement && lateDataOutputTag != null && 
isElementLate(element)) {
sideOutput(element);
+   } else if (isSkippedElement) {
--- End diff --

I think we also need to check whether it's late. An element could also be 
skipped because no windows were assigned to it.


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138789377
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

@zentol if i use `protected final Counter lostDataCount = new 
SimpleCounter()` then i run into `Caused by: java.io.NotSerializableException: 
org.apache.flink.metrics.SimpleCounter` because `SimpleCounter` is not 
Serializable, so i think i should use the old way that `
this.lostDataCount = metrics.counter(LATE_ELEMENTS_METRIC_NAME);` in open 
method 


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138778844
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

 @zentol  i have adjust my code according the comment, and add the doc 
about this metrics, please review again, thanks


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138723827
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

the counter is registered already in the open method upon initialization, i 
think we're good on that front.

However, i would suggest to change this line to `protected final Counter 
lostDataCount = new SimpleCounter()`, and the line in open() to 
`metrics.counter(lostData, this.lostDataCount);`


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138724464
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
--- End diff --

after addressing the below comment about the renaming this comment becomes 
unnecessary and can be removed.


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-13 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138724269
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
--- End diff --

we use camel case for naming metric names. "lost_data" in particular is 
also not really descriptive and the naming scheme is inconsistent with existing 
metrics.

I suggest "numLateRecords".

This string should also be static, and named `LATE_ELEMENTS_METRIC_NAME` 
for clarity purposes.


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138515752
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

but i think the OperatorIOMetricGroup is all about IO Metrics about 
Operator, is it suitable?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread Aitozi
Github user Aitozi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138515541
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

Do you mean that i should add this in OperatorIOMetricGroup?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4665#discussion_r138513684
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 ---
@@ -132,6 +133,13 @@
 */
protected final OutputTag lateDataOutputTag;
 
+   /**
+   * Metrics about the lost data due to arrive late.
+   * */
+   protected final String loseData = "lost_data";
+
+   protected Counter lostDataCount;
--- End diff --

Should this be registered on Flink metric system?


---


[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...

2017-09-12 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/4665

[Flink-7611]add metrics to measure the num of data dropped due to the data 
arrived late

## What is the purpose of the change

1. add metrics to measure the num of data dropped due to the data arrived 
late,this is meanningful when to guide the user to set the suitable 
allowLatency or MaxOutOfOrder time


## Brief change log

  -  register counter metrics in windowOperator#open()
  - invoke inc() method, when judged the isWindowLate()

## Verifying this change

This change is already covered by existing tests by `mvn clean verify`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-7611

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4665.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4665


commit 35b684ce1f3b72018ced26af07808390dff68547
Author: minwenjun 
Date:   2017-09-12T04:26:08Z

add metrics to measure the data dropped due to arrive late

commit aabdc224cb62b29975834e11c1374d182c4d4d01
Author: minwenjun 
Date:   2017-09-12T05:58:07Z

adjust format




---