[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4665 ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140645413 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -231,6 +231,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- I see. Data being side outputted isn't considered being dropped. LGTM ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140635202 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -231,6 +231,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- i think it should write in this way ``` if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.lostDataCount.inc(); } } ``` ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140634868 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- it looks better this way, i will adjust here :) ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140564400 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -231,6 +231,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- the if-else conditions are duplicated and inefficient, and can be further combined as if(isSkippedElement && isElementLate(element)) { if(lateDataOutputTag != null) { sideOutput(element); } this.lostDataCount.inc(); } ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r140564281 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { --- End diff -- the if-else conditions are duplicated and inefficient, and can be further combined as ```java if(isSkippedElement && isElementLate(element)) { if(lateDataOutputTag != null) { sideOutput(element); } this.lostDataCount.inc(); } ``` ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139933981 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- haha, i know your mean now, i think its interesting ^ _ ^. i push the code again , and modify the metrics name according to @zentol , please review the code again @aljoscha ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139911220 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- Or we can just check if `isElementLate(element)`, that's what it is there for. ð ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139742368 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- i think when the `isSkippedElement` is true, the `isElementLate(element)` is always be true. Because `isSkippedElement` is true when all the assigned window's window.endtime + allowLateness < currentLowWatermark, and `isElementLate` is true when element.time + allowLateness < currentLowWatermark. and element.time is <= bigest window.endtime. so does `isElementLate` always be true when isSkippedElement is true? And i think if i want to rule out the situation that **because no windows were assigned to it.**, i just need to judge whether the variable `Collection elementWindows` is empty? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r139702816 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -405,6 +411,8 @@ public void merge(W mergeResult, // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { sideOutput(element); + } else if (isSkippedElement) { --- End diff -- I think we also need to check whether it's late. An element could also be skipped because no windows were assigned to it. ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138789377 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- @zentol if i use `protected final Counter lostDataCount = new SimpleCounter()` then i run into `Caused by: java.io.NotSerializableException: org.apache.flink.metrics.SimpleCounter` because `SimpleCounter` is not Serializable, so i think i should use the old way that ` this.lostDataCount = metrics.counter(LATE_ELEMENTS_METRIC_NAME);` in open method ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138778844 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- @zentol i have adjust my code according the comment, and add the doc about this metrics, please review again, thanks ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138723827 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- the counter is registered already in the open method upon initialization, i think we're good on that front. However, i would suggest to change this line to `protected final Counter lostDataCount = new SimpleCounter()`, and the line in open() to `metrics.counter(lostData, this.lostDataCount);` ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138724464 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. --- End diff -- after addressing the below comment about the renaming this comment becomes unnecessary and can be removed. ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138724269 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; --- End diff -- we use camel case for naming metric names. "lost_data" in particular is also not really descriptive and the naming scheme is inconsistent with existing metrics. I suggest "numLateRecords". This string should also be static, and named `LATE_ELEMENTS_METRIC_NAME` for clarity purposes. ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138515752 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- but i think the OperatorIOMetricGroup is all about IO Metrics about Operator, is it suitable? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user Aitozi commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138515541 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- Do you mean that i should add this in OperatorIOMetricGroup? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4665#discussion_r138513684 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java --- @@ -132,6 +133,13 @@ */ protected final OutputTag lateDataOutputTag; + /** + * Metrics about the lost data due to arrive late. + * */ + protected final String loseData = "lost_data"; + + protected Counter lostDataCount; --- End diff -- Should this be registered on Flink metric system? ---
[GitHub] flink pull request #4665: [Flink-7611]add metrics to measure the num of data...
GitHub user Aitozi opened a pull request: https://github.com/apache/flink/pull/4665 [Flink-7611]add metrics to measure the num of data dropped due to the data arrived late ## What is the purpose of the change 1. add metrics to measure the num of data dropped due to the data arrived late,this is meanningful when to guide the user to set the suitable allowLatency or MaxOutOfOrder time ## Brief change log - register counter metrics in windowOperator#open() - invoke inc() method, when judged the isWindowLate() ## Verifying this change This change is already covered by existing tests by `mvn clean verify`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Aitozi/flink FLINK-7611 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4665 commit 35b684ce1f3b72018ced26af07808390dff68547 Author: minwenjunDate: 2017-09-12T04:26:08Z add metrics to measure the data dropped due to arrive late commit aabdc224cb62b29975834e11c1374d182c4d4d01 Author: minwenjun Date: 2017-09-12T05:58:07Z adjust format ---