This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6da735c247fbe171ff89b275bc94e5565b67527b Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Dec 29 15:37:22 2020 +0100 [FLINK-14814][webui] Display back pressured and busy times --- .../web-dashboard/src/app/interfaces/job-detail.ts | 2 ++ .../pages/job/overview/job-overview.component.ts | 30 +++++++++++++---- .../src/app/services/metrics.service.ts | 38 ++++++++++++++++++++++ .../src/app/share/common/dagre/node.component.html | 33 ++++++++++++------- .../src/app/share/common/dagre/node.component.less | 5 +-- .../src/app/share/common/dagre/node.component.ts | 22 +++++++++++++ 6 files changed, 111 insertions(+), 19 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts index 1429104..051ba12 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts @@ -130,6 +130,8 @@ export interface NodesItemInterface { export interface NodesItemCorrectInterface extends NodesItemInterface { detail: VerticesItemInterface | undefined; lowWatermark?: number; + backPressuredPercentage?: number; + busyPercentage?: number; } export interface NodesItemLinkInterface { diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts index 974c0cd..e4629c9 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts @@ -62,6 +62,22 @@ export class JobOverviewComponent implements OnInit, OnDestroy { } } + mergeWithBackPressure(nodes: NodesItemCorrectInterface[]): Observable<NodesItemCorrectInterface[]> { + return forkJoin( + nodes.map(node => { + return this.metricService.getAggregatedMetrics(this.jobId, node.id, ["backPressuredTimeMsPerSecond", "busyTimeMsPerSecond"]).pipe( + map(result => { + return { + ...node, + backPressuredPercentage: Math.min(Math.round(result.backPressuredTimeMsPerSecond / 10), 100), + busyPercentage: Math.min(Math.round(result.busyTimeMsPerSecond / 10), 100), + }; + }) + ); + }) + ).pipe(catchError(() => of(nodes))); + } + mergeWithWatermarks(nodes: NodesItemCorrectInterface[]): Observable<NodesItemCorrectInterface[]> { return forkJoin( nodes.map(node => { @@ -74,10 +90,12 @@ export class JobOverviewComponent implements OnInit, OnDestroy { ).pipe(catchError(() => of(nodes))); } - refreshNodesWithWatermarks() { - this.mergeWithWatermarks(this.nodes).subscribe(nodes => { - nodes.forEach(node => { - this.dagreComponent.updateNode(node.id, node); + refreshNodesWithMetrics() { + this.mergeWithBackPressure(this.nodes).subscribe(nodes => { + this.mergeWithWatermarks(nodes).subscribe(nodes2 => { + nodes2.forEach(node => { + this.dagreComponent.updateNode(node.id, node); + }); }); }); } @@ -103,10 +121,10 @@ export class JobOverviewComponent implements OnInit, OnDestroy { this.links = data.plan.links; this.jobId = data.plan.jid; this.dagreComponent.flush(this.nodes, this.links, true).then(); - this.refreshNodesWithWatermarks(); + this.refreshNodesWithMetrics(); } else { this.nodes = data.plan.nodes; - this.refreshNodesWithWatermarks(); + this.refreshNodesWithMetrics(); } this.cdr.markForCheck(); }); diff --git a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts index a43564d..816ae54 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts @@ -79,6 +79,44 @@ export class MetricsService { } /** + * Get aggregated metric data from all subtasks of the given vertexId + * @param jobId + * @param vertexId + * @param listOfMetricName + */ + getAggregatedMetrics(jobId: string, vertexId: string, listOfMetricName: string[], aggregate: string = "max") { + const metricName = listOfMetricName.join(','); + return this.httpClient + .get<Array<{ id: string; min: number; max: number; avg: number; sum: number }>>( + `${BASE_URL}/jobs/${jobId}/vertices/${vertexId}/subtasks/metrics?get=${metricName}` + ) + .pipe( + map(arr => { + const result: { [id: string]: number } = {}; + arr.forEach(item => { + switch (aggregate) { + case "min": + result[item.id] = +item.min; + break; + case "max": + result[item.id] = +item.max; + break; + case "avg": + result[item.id] = +item.avg; + break; + case "sum": + result[item.id] = +item.sum; + break; + default: + throw new Error("Unsupported aggregate: " + aggregate); + } + }); + return result; + }) + ); + } + + /** * Gets the watermarks for a given vertex id. * @param jobId * @param vertexId diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html index 3b57b46..0886256 100644 --- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html +++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html @@ -17,15 +17,26 @@ --> <svg:g class="node node-rect"> - <svg:foreignObject class="node-labels-container" [attr.y]="height ? -height / 2 : 0" [attr.width]="200" [attr.height]="height"> - <xhtml:div class="node-label-wrapper"> - <h4 class="content-wrap"> - <xhtml:div class="detail">{{operator}}</xhtml:div> - <xhtml:div class="detail description">{{description}}</xhtml:div> - <xhtml:div class="node-label">Parallelism: {{parallelism}}</xhtml:div> - <xhtml:div class="node-label watermark" *ngIf="lowWatermark">Low Watermark <xhtml:br/> {{lowWatermark}}</xhtml:div> - <xhtml:div class="detail last" *ngIf="operatorStrategy">Operation: {{operatorStrategy}}</xhtml:div> - </h4> - </xhtml:div> - </svg:foreignObject> + <svg:foreignObject class="node-labels-container" [attr.y]="height ? -height / 2 : 0" + [attr.width]="205" [attr.height]="height"> + <xhtml:div class="node-label-wrapper"> + <h4 class="content-wrap"> + <xhtml:div class="detail">{{operator}}</xhtml:div> + <xhtml:div class="detail description">{{description}}</xhtml:div> + <xhtml:div class="node-label">Parallelism: {{parallelism}}</xhtml:div> + <xhtml:div class="node-label metric" title="Maximum back pressured percentage across all subtasks"> + Backpressured (max): {{prettyPrint(backPressuredPercentage)}} + </xhtml:div> + <xhtml:div class="node-label metric" title="Maximum busy percentage across all subtasks"> + Busy (max): {{prettyPrint(busyPercentage)}} + </xhtml:div> + <xhtml:div class="node-label metric" *ngIf="lowWatermark"> + Low Watermark: {{lowWatermark}} + </xhtml:div> + <xhtml:div class="detail last" *ngIf="operatorStrategy"> + Operation: {{operatorStrategy}} + </xhtml:div> + </h4> + </xhtml:div> + </svg:foreignObject> </svg:g> diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less index 0533e05..f16c888 100644 --- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less +++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less @@ -51,15 +51,16 @@ margin-top: 24px; } - &.watermark { + &.metric { font-weight: normal;; font-weight: 12px; + margin-bottom: 2px; color: @text-color-secondary; } } .detail { - margin-bottom: 12px; + margin-bottom: 2px; color: @text-color; &.description { color: @heading-color; diff --git a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts index f24ef24..44b1dbf 100644 --- a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts @@ -31,6 +31,8 @@ export class NodeComponent { operatorStrategy: string | null; parallelism: number | null; lowWatermark: number | null | undefined; + backPressuredPercentage: number | undefined = NaN; + busyPercentage: number | undefined = NaN; height = 0; id: string; @@ -47,6 +49,13 @@ export class NodeComponent { this.operatorStrategy = this.decodeHTML(value.operator_strategy); this.parallelism = value.parallelism; this.lowWatermark = value.lowWatermark; + if (this.isValid(value.backPressuredPercentage)) { + console.log(value.backPressuredPercentage) + this.backPressuredPercentage = value.backPressuredPercentage + } + if (this.isValid(value.busyPercentage)) { + this.busyPercentage = value.busyPercentage; + } this.height = value.height || 0; this.id = value.id; if (description && description.length > 300) { @@ -56,6 +65,10 @@ export class NodeComponent { } } + isValid = (value?: number) => { + return value || value === 0 || value === NaN; + } + constructor(protected cd: ChangeDetectorRef) {} /** @@ -66,4 +79,13 @@ export class NodeComponent { this.node = node; this.cd.markForCheck(); } + + prettyPrint(value: number | undefined): string { + if (value === undefined || isNaN(value)) { + return "N/A" + } + else { + return value + "%"; + } + } }
