[FLINK-3427] [webui] Add watermark tracking
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d84b65ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d84b65ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d84b65ff Branch: refs/heads/master Commit: d84b65ff15876feb3e26dd20beb2e743968502bc Parents: 7a629fc Author: Ufuk Celebi <u...@apache.org> Authored: Wed Mar 8 11:34:29 2017 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Wed Mar 8 15:28:41 2017 +0100 ---------------------------------------------------------------------- .../app/partials/jobs/job.plan.jade | 5 +- .../jobs/job.plan.node-list.watermarks.jade | 36 ++++++++ .../partials/jobs/job.plan.node.watermarks.jade | 27 ++++++ .../app/scripts/common/filters.coffee | 17 ++++ .../web-dashboard/app/scripts/index.coffee | 15 +++- .../app/scripts/modules/jobs/jobs.ctrl.coffee | 68 +++++++++++++-- .../app/scripts/modules/jobs/jobs.dir.coffee | 90 ++++++++++++++------ 7 files changed, 221 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade index e84dd04..c33b9a3 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade @@ -18,7 +18,7 @@ split .split#canvas .canvas-wrapper - div.main-canvas(job-plan, plan="plan", jobid="{{jobid}}", set-node="changeNode(nodeid)") + div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)") .split#job-panel .panel.panel-default.panel-multi(ng-if="plan") @@ -34,6 +34,9 @@ split a(ui-sref=".metrics({nodeid: nodeid})") Metrics li(ui-sref-active='active') + a(ui-sref=".watermarks({nodeid: nodeid})") Watermarks + + li(ui-sref-active='active') a(ui-sref=".accumulators({nodeid: nodeid})") Accumulators li(ui-sref-active='active') http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade new file mode 100644 index 0000000..6b4c6a2 --- /dev/null +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade @@ -0,0 +1,36 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-body-hover.table-clickable.table-activable + thead + tr + th Name + th Low Watermark + th Parallelism + th Status + + tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && hasWatermarks(nodeid) }" ng-click="changeNode(v.id)") + tr(ng-if="v.type == 'regular'") + + td.td-long {{ v.name | humanizeText }} + td {{ watermarks | lowWatermark:v.id }} + td {{ v.parallelism }} + td + bs-label(status="{{v.status}}") {{v.status}} + tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)") + td(colspan="11") + div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ") http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade new file mode 100644 index 0000000..b406a1c --- /dev/null +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade @@ -0,0 +1,27 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="hasWatermarks(nodeid)") + thead + tr + th id + th Watermark + + tbody + tr(ng-repeat="watermark in watermarksByNode(nodeid)") + td {{ watermark.id }} + td {{ watermark.value | parseWatermark }} http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee index 67b02e3..99e12a8 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee @@ -87,3 +87,20 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' + +.filter "parseWatermark", (watermarksConfig)-> + (value) -> + if value <= watermarksConfig.minValue + return 'No Watermark' + else + return value + +.filter "lowWatermark", (watermarksConfig)-> + (watermarks, nodeid) -> + lowWatermark = "No Watermark" + if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length + values = (watermark.value for watermark in watermarks[nodeid]) + lowWatermark = Math.min.apply(null, values) + if lowWatermark <= watermarksConfig.minValue + lowWatermark = "No Watermark" + return lowWatermark http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/index.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee index 95bb356..cbbefab 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee @@ -30,12 +30,18 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) .value 'flinkConfig', { jobServer: '' -# jobServer: 'http://localhost:8081/' + # jobServer: 'http://localhost:8081/' "refresh-interval": 10000 } # -------------------------------------- +.value 'watermarksConfig', { + minValue: -9223372036854776000 +} + +# -------------------------------------- + .run (JobsService, MainService, flinkConfig, $interval) -> MainService.loadConfig().then (config) -> angular.extend flinkConfig, config @@ -114,6 +120,13 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) templateUrl: "partials/jobs/job.plan.node-list.metrics.html" controller: 'JobPlanMetricsController' + .state "single-job.plan.watermarks", + url: "/watermarks" + views: + 'node-details': + templateUrl: "partials/jobs/job.plan.node-list.watermarks.html" + controller: 'JobPlanWatermarksController' + .state "single-job.plan.taskmanagers", url: "/taskmanagers" views: http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee index bbb57c5..d18d7e3 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee @@ -42,23 +42,18 @@ angular.module('flinkApp') # -------------------------------------- -.controller 'SingleJobController', ($scope, $state, $stateParams, JobsService, MetricsService, $rootScope, flinkConfig, $interval) -> +.controller 'SingleJobController', ($scope, $state, $stateParams, JobsService, MetricsService, $rootScope, flinkConfig, $interval, $q, watermarksConfig) -> $scope.jobid = $stateParams.jobid $scope.job = null $scope.plan = null + $scope.watermarks = null + $scope.lowWatermarks = null $scope.vertices = null $scope.backPressureOperatorStats = {} - JobsService.loadJob($stateParams.jobid).then (data) -> - $scope.job = data - $scope.plan = data.plan - $scope.vertices = data.vertices - MetricsService.setupMetrics($stateParams.jobid, data.vertices) - refresher = $interval -> JobsService.loadJob($stateParams.jobid).then (data) -> $scope.job = data - $scope.$broadcast 'reload' , flinkConfig["refresh-interval"] @@ -66,6 +61,8 @@ angular.module('flinkApp') $scope.$on '$destroy', -> $scope.job = null $scope.plan = null + $scope.watermarks = null + $scope.lowWatermarks = null $scope.vertices = null $scope.backPressureOperatorStats = null @@ -81,6 +78,50 @@ angular.module('flinkApp') JobsService.stopJob($stateParams.jobid).then (data) -> {} + JobsService.loadJob($stateParams.jobid).then (data) -> + $scope.job = data + $scope.vertices = data.vertices + $scope.plan = data.plan + MetricsService.setupMetrics($stateParams.jobid, data.vertices) + + getWatermarks = (nodes)-> + # This function uses a promise to resolve watermarks once fetched via the metrics service, since watermarks have to be fetched individually for each node, we have to wait until all API calls have been made before we can resolve the promise. In the end we will have an array of low watermarks for each node: e.g. {somenodeid: [{id: 0, value: -9223372036854776000}], anothernodeid: [{id: 0, value: -9223372036854776000}, {id: 1, value: -9223372036854776000}]}. + deferred = $q.defer() + watermarks = {} + jid = $scope.job.jid + angular.forEach nodes, (node, index) => + metricIds = [] + # for each node, we need to specify which metrics we want to collect, for each subtask, we need to fetch the currentLowWatermark, and each param is formed by concatenating subtask index to '.currentLowWatermark'. + for num in [0..node.parallelism - 1] + metricIds.push(num + ".currentLowWatermark") + MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> + values = [] + for key, value of data.values + values.push(id: key.replace('.currentLowWatermark', ''), value: value) + watermarks[node.id] = values + if index >= $scope.plan.nodes.length - 1 + deferred.resolve(watermarks) + deferred.promise + + getLowWatermarks = (watermarks)-> + lowWatermarks = [] + for k,v of watermarks + minValue = Math.min.apply(null,(watermark.value for watermark in v)) + lowWatermarks[k] = if minValue <= watermarksConfig.minValue || v.length == 0 then 'No Watermark' else minValue + return lowWatermarks + + $scope.$watch 'plan', (newPlan) -> + if newPlan + getWatermarks(newPlan.nodes).then (data) -> + $scope.watermarks = data + $scope.lowWatermarks = getLowWatermarks(data) + + $scope.$on 'reload', (event) -> + if $scope.plan + getWatermarks($scope.plan.nodes).then (data) -> + $scope.watermarks = data + $scope.lowWatermarks = getLowWatermarks(data) + # -------------------------------------- .controller 'JobPlanController', ($scope, $state, $stateParams, $window, JobsService) -> @@ -318,3 +359,14 @@ angular.module('flinkApp') loadMetrics() if $scope.nodeid # -------------------------------------- + +.controller 'JobPlanWatermarksController', ($scope, $filter) -> + $scope.hasWatermarks = (nodeid) -> + return true if $scope.watermarksByNode(nodeid).length + + $scope.watermarksByNode = (nodeid) -> + if $scope.watermarks != null && $scope.watermarks[nodeid] && $scope.watermarks[nodeid].length + return $scope.watermarks[nodeid] + return [] + +# -------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d84b65ff/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee index b67d0bf..950cf06 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee @@ -168,12 +168,14 @@ angular.module('flinkApp') return # ---------------------------------------------- + .directive 'split', () -> return compile: (tElem, tAttrs) -> Split(tElem.children(), ( sizes: [50, 50] direction: 'vertical' )) + # ---------------------------------------------- .directive 'jobPlan', ($timeout) -> @@ -187,6 +189,7 @@ angular.module('flinkApp') scope: plan: '=' + lowWatermarks: '=' setNode: '&' link: (scope, elem, attrs) -> @@ -209,6 +212,9 @@ angular.module('flinkApp') containerW = elem.width() angular.element(elem.children()[0]).width(containerW) + lastZoomScale = 0 + lastPosition = 0 + scope.zoomIn = -> if mainZoom.scale() < 2.99 @@ -222,6 +228,9 @@ angular.module('flinkApp') # Transform svg d3mainSvgG.attr "transform", "translate(" + v1 + "," + v2 + ") scale(" + mainZoom.scale() + ")" + lastZoomScale = mainZoom.scale() + lastPosition = mainZoom.translate() + scope.zoomOut = -> if mainZoom.scale() > 0.31 @@ -235,6 +244,9 @@ angular.module('flinkApp') # Transform svg d3mainSvgG.attr "transform", "translate(" + v1 + "," + v2 + ") scale(" + mainZoom.scale() + ")" + lastZoomScale = mainZoom.scale() + lastPosition = mainZoom.translate() + #create a label of an edge createLabelEdge = (el) -> labelValue = "" @@ -288,6 +300,7 @@ angular.module('flinkApp') # Otherwise add infos labelValue += "<h5>" + info + " Node</h5>" if isSpecialIterationNode(info) labelValue += "<h5>Parallelism: " + el.parallelism + "</h5>" unless el.parallelism is "" + labelValue += "<h5>Low Watermark: " + el.lowWatermark + "</h5>" unless el.lowWatermark is `undefined` labelValue += "<h5>Operation: " + shortenString(el.operator_strategy) + "</h5>" unless el.operator is `undefined` or not el.operator_strategy # labelValue += "</a>" labelValue += "</div>" @@ -422,43 +435,66 @@ angular.module('flinkApp') for j of el.step_function return el.step_function[j] if el.step_function[j].id is nodeID - drawGraph = (data) -> - g = new dagreD3.graphlib.Graph({ multigraph: true, compound: true }).setGraph({ - nodesep: 70 - edgesep: 0 - ranksep: 50 - rankdir: "LR" - marginx: 40 - marginy: 40 - }) + mergeWatermarks = (data, watermarks) -> + for k,v of watermarks + for node in data.nodes + if node.id == k + node.lowWatermark = v + return data - loadJsonToDagre(g, data) + lastPosition = 0 + lastZoomScale = 0 - renderer = new dagreD3.render() - d3mainSvgG.call(renderer, g) + drawGraph = () -> + if scope.plan + g = new dagreD3.graphlib.Graph({ multigraph: true, compound: true }).setGraph({ + nodesep: 70 + edgesep: 0 + ranksep: 50 + rankdir: "LR" + marginx: 40 + marginy: 40 + }) - for i, sg of subgraphs - d3mainSvg.select('svg.svg-' + i + ' g').call(renderer, sg) + loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.lowWatermarks)) - newScale = 0.5 + d3mainSvgG.selectAll("*").remove() - xCenterOffset = Math.floor((angular.element(mainSvgElement).width() - g.graph().width * newScale) / 2) - yCenterOffset = Math.floor((angular.element(mainSvgElement).height() - g.graph().height * newScale) / 2) + d3mainSvgG.attr("transform", "scale(" + 1 + ")") - mainZoom.scale(newScale).translate([xCenterOffset, yCenterOffset]) + renderer = new dagreD3.render() + d3mainSvgG.call(renderer, g) - d3mainSvgG.attr("transform", "translate(" + xCenterOffset + ", " + yCenterOffset + ") scale(" + mainZoom.scale() + ")") + for i, sg of subgraphs + d3mainSvg.select('svg.svg-' + i + ' g').call(renderer, sg) - mainZoom.on("zoom", -> - ev = d3.event - d3mainSvgG.attr "transform", "translate(" + ev.translate + ") scale(" + ev.scale + ")" - ) - mainZoom(d3mainSvg) + newScale = 0.5 + + xCenterOffset = Math.floor((angular.element(mainSvgElement).width() - g.graph().width * newScale) / 2) + yCenterOffset = Math.floor((angular.element(mainSvgElement).height() - g.graph().height * newScale) / 2) + + if lastZoomScale != 0 && lastPosition != 0 + mainZoom.scale(lastZoomScale).translate(lastPosition) + d3mainSvgG.attr("transform", "translate(" + lastPosition + ") scale(" + lastZoomScale + ")") + else + mainZoom.scale(newScale).translate([xCenterOffset, yCenterOffset]) + d3mainSvgG.attr("transform", "translate(" + xCenterOffset + ", " + yCenterOffset + ") scale(" + mainZoom.scale() + ")") - d3mainSvgG.selectAll('.node').on 'click', (d) -> - scope.setNode({ nodeid: d }) + mainZoom.on("zoom", -> + ev = d3.event + lastZoomScale = ev.scale + lastPosition = ev.translate + d3mainSvgG.attr "transform", "translate(" + lastPosition + ") scale(" + lastZoomScale + ")" + ) + mainZoom(d3mainSvg) + + d3mainSvgG.selectAll('.node').on 'click', (d) -> + scope.setNode({ nodeid: d }) scope.$watch attrs.plan, (newPlan) -> - drawGraph(newPlan) if newPlan + drawGraph() if newPlan + + scope.$watch attrs.lowWatermarks, (newLowWatermarks) -> + drawGraph() if newLowWatermarks && scope.plan return