[FLINK-3427] [webui] Refactorings to watermark tracking
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ef18f65 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ef18f65 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ef18f65 Branch: refs/heads/master Commit: 4ef18f6597bfde99733f8d4f4a54b90fc943c663 Parents: d84b65f Author: Ufuk Celebi <u...@apache.org> Authored: Tue Mar 7 11:36:50 2017 +0100 Committer: Ufuk Celebi <u...@apache.org> Committed: Wed Mar 8 15:28:41 2017 +0100 ---------------------------------------------------------------------- .../app/partials/jobs/job.plan.jade | 2 +- .../jobs/job.plan.node-list.watermarks.jade | 14 +-- .../partials/jobs/job.plan.node.watermarks.jade | 10 +- .../app/scripts/common/filters.coffee | 16 +--- .../web-dashboard/app/scripts/index.coffee | 9 +- .../app/scripts/modules/jobs/jobs.ctrl.coffee | 98 ++++++++++++-------- .../app/scripts/modules/jobs/jobs.dir.coffee | 17 ++-- 7 files changed, 93 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade index c33b9a3..6c4cf0b 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.jade @@ -18,7 +18,7 @@ split .split#canvas .canvas-wrapper - div.main-canvas(job-plan, plan="plan", low-watermarks="lowWatermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)") + div.main-canvas(job-plan, plan="plan", watermarks="watermarks" jobid="{{jobid}}", set-node="changeNode(nodeid)") .split#job-panel .panel.panel-default.panel-multi(ng-if="plan") http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade index 6b4c6a2..4605b61 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node-list.watermarks.jade @@ -23,14 +23,14 @@ table.table.table-body-hover.table-clickable.table-activable th Parallelism th Status - tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid && hasWatermarks(nodeid) }" ng-click="changeNode(v.id)") + tbody(ng-repeat="v in job.vertices" ng-class="{ active: v.id == nodeid }" ng-click="changeNode(v.id)") tr(ng-if="v.type == 'regular'") - td.td-long {{ v.name | humanizeText }} - td {{ watermarks | lowWatermark:v.id }} + td {{ watermarks[v.id]["lowWatermark"] | humanizeWatermark }} td {{ v.parallelism }} - td + td bs-label(status="{{v.status}}") {{v.status}} - tr(ng-if="nodeid && v.id == nodeid && hasWatermarks(nodeid)") - td(colspan="11") - div(ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ") + tr(ng-if="nodeid && v.id == nodeid") + td(colspan="4") + div(ng-show="hasWatermark(v.id)" ng-include=" 'partials/jobs/job.plan.node.watermarks.html' ") + div(ng-show="!hasWatermark(v.id)") No Watermarks http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade index b406a1c..451ccaa 100644 --- a/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade +++ b/flink-runtime-web/web-dashboard/app/partials/jobs/job.plan.node.watermarks.jade @@ -15,13 +15,13 @@ See the License for the specific language governing permissions and limitations under the License. -table.table.table-hover.table-clickable.table-activable.table-inner(ng-if="hasWatermarks(nodeid)") +table.table.table-hover.table-clickable.table-activable.table-inner thead tr - th id + th Subtask th Watermark tbody - tr(ng-repeat="watermark in watermarksByNode(nodeid)") - td {{ watermark.id }} - td {{ watermark.value | parseWatermark }} + tr(ng-repeat="(subtaskIndex, watermark) in watermarks[nodeid]['watermarks']") + td {{ subtaskIndex | increment }} + td {{ watermark | humanizeWatermark }} http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee index 99e12a8..a3ce508 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/common/filters.coffee @@ -88,19 +88,13 @@ angular.module('flinkApp') .filter "percentage", -> (number) -> (number * 100).toFixed(0) + '%' -.filter "parseWatermark", (watermarksConfig)-> +.filter "humanizeWatermark", (watermarksConfig) -> (value) -> - if value <= watermarksConfig.minValue + if isNaN(value) || value <= watermarksConfig.noWatermark return 'No Watermark' else return value -.filter "lowWatermark", (watermarksConfig)-> - (watermarks, nodeid) -> - lowWatermark = "No Watermark" - if watermarks != null && watermarks[nodeid] && watermarks[nodeid].length - values = (watermark.value for watermark in watermarks[nodeid]) - lowWatermark = Math.min.apply(null, values) - if lowWatermark <= watermarksConfig.minValue - lowWatermark = "No Watermark" - return lowWatermark +.filter "increment", -> + (number) -> + parseInt(number) + 1 http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/index.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee index cbbefab..52bb075 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee @@ -30,14 +30,17 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) .value 'flinkConfig', { jobServer: '' - # jobServer: 'http://localhost:8081/' +# jobServer: 'http://localhost:8081/' "refresh-interval": 10000 } # -------------------------------------- .value 'watermarksConfig', { - minValue: -9223372036854776000 + # A value of (Java) Long.MIN_VALUE indicates that there is no watermark + # available. This is parsed by Javascript as this number. We have it as + # a constant here to compare available watermarks against. + noWatermark: -9223372036854776000 } # -------------------------------------- @@ -52,7 +55,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) JobsService.listJobs() , flinkConfig["refresh-interval"] - # -------------------------------------- .config ($uiViewScrollProvider) -> @@ -125,7 +127,6 @@ angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) views: 'node-details': templateUrl: "partials/jobs/job.plan.node-list.watermarks.html" - controller: 'JobPlanWatermarksController' .state "single-job.plan.taskmanagers", url: "/taskmanagers" http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee index d18d7e3..f25c94d 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.ctrl.coffee @@ -46,8 +46,7 @@ angular.module('flinkApp') $scope.jobid = $stateParams.jobid $scope.job = null $scope.plan = null - $scope.watermarks = null - $scope.lowWatermarks = null + $scope.watermarks = {} $scope.vertices = null $scope.backPressureOperatorStats = {} @@ -61,8 +60,7 @@ angular.module('flinkApp') $scope.$on '$destroy', -> $scope.job = null $scope.plan = null - $scope.watermarks = null - $scope.lowWatermarks = null + $scope.watermarks = {} $scope.vertices = null $scope.backPressureOperatorStats = null @@ -84,43 +82,80 @@ angular.module('flinkApp') $scope.plan = data.plan MetricsService.setupMetrics($stateParams.jobid, data.vertices) - getWatermarks = (nodes)-> - # This function uses a promise to resolve watermarks once fetched via the metrics service, since watermarks have to be fetched individually for each node, we have to wait until all API calls have been made before we can resolve the promise. In the end we will have an array of low watermarks for each node: e.g. {somenodeid: [{id: 0, value: -9223372036854776000}], anothernodeid: [{id: 0, value: -9223372036854776000}, {id: 1, value: -9223372036854776000}]}. + # Asynchronously requests the watermark metrics for the given nodes. The + # returned object has the following structure: + # + # { + # "<nodeId>": { + # "lowWatermark": <lowWatermark> + # "watermarks": { + # 0: <watermark for subtask 0> + # ... + # n: <watermark for subtask n> + # } + # } + # } + # + # If no watermark is available, lowWatermark will be NaN and + # the watermarks will be empty. + getWatermarks = (nodes) -> + # Requests the watermarks for a single vertex. Triggers a request + # to the Metrics service. + requestWatermarkForNode = (node) => + deferred = $q.defer() + + jid = $scope.job.jid + + # Request metrics for each subtask + metricIds = (i + ".currentLowWatermark" for i in [0..node.parallelism - 1]) + MetricsService.getMetrics(jid, node.id, metricIds).then (metrics) -> + minValue = NaN + watermarks = {} + + for key, value of metrics.values + subtaskIndex = key.replace('.currentLowWatermark', '') + watermarks[subtaskIndex] = value + + if (isNaN(minValue) || value < minValue) + minValue = value + + if (!isNaN(minValue) && minValue > watermarksConfig.noWatermark) + lowWatermark = minValue + else + # NaN indicates no watermark available + lowWatermark = NaN + + deferred.resolve({"lowWatermark": lowWatermark, "watermarks": watermarks}) + + deferred.promise + deferred = $q.defer() watermarks = {} - jid = $scope.job.jid + + # Request watermarks for each node and update watermarks + len = nodes.length angular.forEach nodes, (node, index) => - metricIds = [] - # for each node, we need to specify which metrics we want to collect, for each subtask, we need to fetch the currentLowWatermark, and each param is formed by concatenating subtask index to '.currentLowWatermark'. - for num in [0..node.parallelism - 1] - metricIds.push(num + ".currentLowWatermark") - MetricsService.getMetrics(jid, node.id, metricIds).then (data) -> - values = [] - for key, value of data.values - values.push(id: key.replace('.currentLowWatermark', ''), value: value) - watermarks[node.id] = values - if index >= $scope.plan.nodes.length - 1 + nodeId = node.id + requestWatermarkForNode(node).then (data) -> + watermarks[nodeId] = data + if (index >= len - 1) deferred.resolve(watermarks) + deferred.promise - getLowWatermarks = (watermarks)-> - lowWatermarks = [] - for k,v of watermarks - minValue = Math.min.apply(null,(watermark.value for watermark in v)) - lowWatermarks[k] = if minValue <= watermarksConfig.minValue || v.length == 0 then 'No Watermark' else minValue - return lowWatermarks + # Returns true if the lowWatermark is != NaN + $scope.hasWatermark = (nodeid) -> + $scope.watermarks[nodeid] && !isNaN($scope.watermarks[nodeid]["lowWatermark"]) $scope.$watch 'plan', (newPlan) -> if newPlan getWatermarks(newPlan.nodes).then (data) -> $scope.watermarks = data - $scope.lowWatermarks = getLowWatermarks(data) - $scope.$on 'reload', (event) -> + $scope.$on 'reload', () -> if $scope.plan getWatermarks($scope.plan.nodes).then (data) -> $scope.watermarks = data - $scope.lowWatermarks = getLowWatermarks(data) # -------------------------------------- @@ -359,14 +394,3 @@ angular.module('flinkApp') loadMetrics() if $scope.nodeid # -------------------------------------- - -.controller 'JobPlanWatermarksController', ($scope, $filter) -> - $scope.hasWatermarks = (nodeid) -> - return true if $scope.watermarksByNode(nodeid).length - - $scope.watermarksByNode = (nodeid) -> - if $scope.watermarks != null && $scope.watermarks[nodeid] && $scope.watermarks[nodeid].length - return $scope.watermarks[nodeid] - return [] - -# -------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4ef18f65/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee index 950cf06..36b0c43 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.dir.coffee @@ -169,7 +169,7 @@ angular.module('flinkApp') # ---------------------------------------------- -.directive 'split', () -> +.directive 'split', () -> return compile: (tElem, tAttrs) -> Split(tElem.children(), ( sizes: [50, 50] @@ -189,7 +189,7 @@ angular.module('flinkApp') scope: plan: '=' - lowWatermarks: '=' + watermarks: '=' setNode: '&' link: (scope, elem, attrs) -> @@ -436,10 +436,11 @@ angular.module('flinkApp') return el.step_function[j] if el.step_function[j].id is nodeID mergeWatermarks = (data, watermarks) -> - for k,v of watermarks + if (!_.isEmpty(watermarks)) for node in data.nodes - if node.id == k - node.lowWatermark = v + if (watermarks[node.id] && !isNaN(watermarks[node.id]["lowWatermark"])) + node.lowWatermark = watermarks[node.id]["lowWatermark"] + return data lastPosition = 0 @@ -456,7 +457,7 @@ angular.module('flinkApp') marginy: 40 }) - loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.lowWatermarks)) + loadJsonToDagre(g, mergeWatermarks(scope.plan, scope.watermarks)) d3mainSvgG.selectAll("*").remove() @@ -494,7 +495,7 @@ angular.module('flinkApp') scope.$watch attrs.plan, (newPlan) -> drawGraph() if newPlan - scope.$watch attrs.lowWatermarks, (newLowWatermarks) -> - drawGraph() if newLowWatermarks && scope.plan + scope.$watch attrs.watermarks, (newWatermarks) -> + drawGraph() if newWatermarks && scope.plan return