[FLINK-9467][metrics][WebUI] Fix watermark display

This closes #6152.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3f29002
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3f29002
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3f29002

Branch: refs/heads/master
Commit: a3f290020c6d0d15d993dcdc24fa1ef98c63739b
Parents: 3f07ecc
Author: zentol <ches...@apache.org>
Authored: Tue Jun 5 13:51:08 2018 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Jun 21 14:32:39 2018 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                                   | 8 ++++----
 .../web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee  | 4 ++--
 .../flink/streaming/runtime/tasks/OneInputStreamTask.java    | 1 +
 .../flink/streaming/runtime/tasks/TwoInputStreamTask.java    | 1 +
 4 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 49d7ba8..7fe5e7a 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1228,7 +1228,7 @@ Thus, in order to infer the metric identifier:
       <td>Meter</td>
     </tr>
     <tr>
-      <th rowspan="5"><strong>Task/Operator</strong></th>
+      <th rowspan="6"><strong>Task/Operator</strong></th>
       <td>numRecordsIn</td>
       <td>The total number of records this operator/task has received.</td>
       <td>Counter</td>
@@ -1254,15 +1254,15 @@ Thus, in order to infer the metric identifier:
       <td>Counter</td>
     </tr>
     <tr>
-      <th rowspan="5"><strong>Operator</strong></th>
       <td>currentInputWatermark</td>
       <td>
-        The last watermark this operator has received (in milliseconds).
-        <p><strong>Note:</strong> For operators with 2 inputs this is the 
minimum of the last received watermarks.</p>
+        The last watermark this operator/tasks has received (in milliseconds).
+        <p><strong>Note:</strong> For operators/tasks with 2 inputs this is 
the minimum of the last received watermarks.</p>
       </td>
       <td>Gauge</td>
     </tr>
     <tr>
+      <th rowspan="4"><strong>Operator</strong></th>
       <td>currentInput1Watermark</td>
       <td>
         The last watermark this operator has received in its first input (in 
milliseconds).

http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee 
b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
index 517b00c..ed2b22d 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee
@@ -107,13 +107,13 @@ angular.module('flinkApp')
       jid = $scope.job.jid
 
       # Request metrics for each subtask
-      metricIds = (i + ".currentLowWatermark" for i in [0..node.parallelism - 
1])
+      metricIds = (i + ".currentInputWatermark" for i in [0..node.parallelism 
- 1])
       MetricsService.getMetrics(jid, node.id, metricIds).then (metrics) ->
         minValue = NaN
         watermarks = {}
 
         for key, value of metrics.values
-          subtaskIndex = key.replace('.currentLowWatermark', '')
+          subtaskIndex = key.replace('.currentInputWatermark', '')
           watermarks[subtaskIndex] = value
 
           if (isNaN(minValue) || value < minValue)

http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 26088e4..43eab24 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -93,6 +93,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                                        inputWatermarkGauge);
                }
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
+               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
this.inputWatermarkGauge);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index bd878f6..93a5675 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -105,6 +105,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, 
input2WatermarkGauge);
+               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
        }
 
        @Override

Reply via email to