[FLINK-9467][metrics][WebUI] Fix watermark display This closes #6152.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3f29002 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3f29002 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3f29002 Branch: refs/heads/master Commit: a3f290020c6d0d15d993dcdc24fa1ef98c63739b Parents: 3f07ecc Author: zentol <ches...@apache.org> Authored: Tue Jun 5 13:51:08 2018 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Jun 21 14:32:39 2018 +0200 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 8 ++++---- .../web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee | 4 ++-- .../flink/streaming/runtime/tasks/OneInputStreamTask.java | 1 + .../flink/streaming/runtime/tasks/TwoInputStreamTask.java | 1 + 4 files changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 49d7ba8..7fe5e7a 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1228,7 +1228,7 @@ Thus, in order to infer the metric identifier: <td>Meter</td> </tr> <tr> - <th rowspan="5"><strong>Task/Operator</strong></th> + <th rowspan="6"><strong>Task/Operator</strong></th> <td>numRecordsIn</td> <td>The total number of records this operator/task has received.</td> <td>Counter</td> @@ -1254,15 +1254,15 @@ Thus, in order to infer the metric identifier: <td>Counter</td> </tr> <tr> - <th rowspan="5"><strong>Operator</strong></th> <td>currentInputWatermark</td> <td> - The last watermark this operator has received (in milliseconds). - <p><strong>Note:</strong> For operators with 2 inputs this is the minimum of the last received watermarks.</p> + The last watermark this operator/tasks has received (in milliseconds). + <p><strong>Note:</strong> For operators/tasks with 2 inputs this is the minimum of the last received watermarks.</p> </td> <td>Gauge</td> </tr> <tr> + <th rowspan="4"><strong>Operator</strong></th> <td>currentInput1Watermark</td> <td> The last watermark this operator has received in its first input (in milliseconds). http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee index 517b00c..ed2b22d 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee @@ -107,13 +107,13 @@ angular.module('flinkApp') jid = $scope.job.jid # Request metrics for each subtask - metricIds = (i + ".currentLowWatermark" for i in [0..node.parallelism - 1]) + metricIds = (i + ".currentInputWatermark" for i in [0..node.parallelism - 1]) MetricsService.getMetrics(jid, node.id, metricIds).then (metrics) -> minValue = NaN watermarks = {} for key, value of metrics.values - subtaskIndex = key.replace('.currentLowWatermark', '') + subtaskIndex = key.replace('.currentInputWatermark', '') watermarks[subtaskIndex] = value if (isNaN(minValue) || value < minValue) http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 26088e4..43eab24 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -93,6 +93,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO inputWatermarkGauge); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); + getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a3f29002/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index bd878f6..93a5675 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -105,6 +105,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge); + getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); } @Override