This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6da735c247fbe171ff89b275bc94e5565b67527b
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Dec 29 15:37:22 2020 +0100

    [FLINK-14814][webui] Display back pressured and busy times
---
 .../web-dashboard/src/app/interfaces/job-detail.ts |  2 ++
 .../pages/job/overview/job-overview.component.ts   | 30 +++++++++++++----
 .../src/app/services/metrics.service.ts            | 38 ++++++++++++++++++++++
 .../src/app/share/common/dagre/node.component.html | 33 ++++++++++++-------
 .../src/app/share/common/dagre/node.component.less |  5 +--
 .../src/app/share/common/dagre/node.component.ts   | 22 +++++++++++++
 6 files changed, 111 insertions(+), 19 deletions(-)

diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
index 1429104..051ba12 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts
@@ -130,6 +130,8 @@ export interface NodesItemInterface {
 export interface NodesItemCorrectInterface extends NodesItemInterface {
   detail: VerticesItemInterface | undefined;
   lowWatermark?: number;
+  backPressuredPercentage?: number;
+  busyPercentage?: number;
 }
 
 export interface NodesItemLinkInterface {
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 974c0cd..e4629c9 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -62,6 +62,22 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
     }
   }
 
+  mergeWithBackPressure(nodes: NodesItemCorrectInterface[]): 
Observable<NodesItemCorrectInterface[]> {
+      return forkJoin(
+        nodes.map(node => {
+          return this.metricService.getAggregatedMetrics(this.jobId, node.id, 
["backPressuredTimeMsPerSecond", "busyTimeMsPerSecond"]).pipe(
+            map(result => {
+              return {
+                ...node,
+                backPressuredPercentage: 
Math.min(Math.round(result.backPressuredTimeMsPerSecond / 10), 100),
+                busyPercentage: Math.min(Math.round(result.busyTimeMsPerSecond 
/ 10), 100),
+              };
+            })
+          );
+        })
+      ).pipe(catchError(() => of(nodes)));
+    }
+
   mergeWithWatermarks(nodes: NodesItemCorrectInterface[]): 
Observable<NodesItemCorrectInterface[]> {
     return forkJoin(
       nodes.map(node => {
@@ -74,10 +90,12 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
     ).pipe(catchError(() => of(nodes)));
   }
 
-  refreshNodesWithWatermarks() {
-    this.mergeWithWatermarks(this.nodes).subscribe(nodes => {
-      nodes.forEach(node => {
-        this.dagreComponent.updateNode(node.id, node);
+  refreshNodesWithMetrics() {
+    this.mergeWithBackPressure(this.nodes).subscribe(nodes => {
+      this.mergeWithWatermarks(nodes).subscribe(nodes2 => {
+        nodes2.forEach(node => {
+          this.dagreComponent.updateNode(node.id, node);
+        });
       });
     });
   }
@@ -103,10 +121,10 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
           this.links = data.plan.links;
           this.jobId = data.plan.jid;
           this.dagreComponent.flush(this.nodes, this.links, true).then();
-          this.refreshNodesWithWatermarks();
+          this.refreshNodesWithMetrics();
         } else {
           this.nodes = data.plan.nodes;
-          this.refreshNodesWithWatermarks();
+          this.refreshNodesWithMetrics();
         }
         this.cdr.markForCheck();
       });
diff --git 
a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts 
b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
index a43564d..816ae54 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/metrics.service.ts
@@ -79,6 +79,44 @@ export class MetricsService {
   }
 
   /**
+   * Get aggregated metric data from all subtasks of the given vertexId
+   * @param jobId
+   * @param vertexId
+   * @param listOfMetricName
+   */
+  getAggregatedMetrics(jobId: string, vertexId: string, listOfMetricName: 
string[], aggregate: string = "max") {
+    const metricName = listOfMetricName.join(',');
+    return this.httpClient
+      .get<Array<{ id: string; min: number; max: number; avg: number; sum: 
number }>>(
+        
`${BASE_URL}/jobs/${jobId}/vertices/${vertexId}/subtasks/metrics?get=${metricName}`
+      )
+      .pipe(
+        map(arr => {
+          const result: { [id: string]: number } = {};
+          arr.forEach(item => {
+            switch (aggregate) {
+              case "min":
+                result[item.id] = +item.min;
+                break;
+              case "max":
+                result[item.id] = +item.max;
+                break;
+              case "avg":
+                result[item.id] = +item.avg;
+                break;
+              case "sum":
+                result[item.id] = +item.sum;
+                break;
+              default:
+                throw new Error("Unsupported aggregate: " + aggregate);
+            }
+          });
+          return result;
+        })
+      );
+  }
+
+  /**
    * Gets the watermarks for a given vertex id.
    * @param jobId
    * @param vertexId
diff --git 
a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
 
b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
index 3b57b46..0886256 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.html
@@ -17,15 +17,26 @@
   -->
 
 <svg:g class="node node-rect">
-  <svg:foreignObject class="node-labels-container" [attr.y]="height ? -height 
/ 2 : 0" [attr.width]="200" [attr.height]="height">
-    <xhtml:div class="node-label-wrapper">
-      <h4 class="content-wrap">
-        <xhtml:div class="detail">{{operator}}</xhtml:div>
-        <xhtml:div class="detail description">{{description}}</xhtml:div>
-        <xhtml:div class="node-label">Parallelism: {{parallelism}}</xhtml:div>
-        <xhtml:div class="node-label watermark" *ngIf="lowWatermark">Low 
Watermark <xhtml:br/> {{lowWatermark}}</xhtml:div>
-        <xhtml:div class="detail last" *ngIf="operatorStrategy">Operation: 
{{operatorStrategy}}</xhtml:div>
-      </h4>
-    </xhtml:div>
-  </svg:foreignObject>
+    <svg:foreignObject class="node-labels-container" [attr.y]="height ? 
-height / 2 : 0"
+                       [attr.width]="205" [attr.height]="height">
+        <xhtml:div class="node-label-wrapper">
+            <h4 class="content-wrap">
+                <xhtml:div class="detail">{{operator}}</xhtml:div>
+                <xhtml:div class="detail 
description">{{description}}</xhtml:div>
+                <xhtml:div class="node-label">Parallelism: 
{{parallelism}}</xhtml:div>
+                <xhtml:div class="node-label metric" title="Maximum back 
pressured percentage across all subtasks">
+                    Backpressured (max): 
{{prettyPrint(backPressuredPercentage)}}
+                </xhtml:div>
+                <xhtml:div class="node-label metric" title="Maximum busy 
percentage across all subtasks">
+                    Busy (max): {{prettyPrint(busyPercentage)}}
+                </xhtml:div>
+                <xhtml:div class="node-label metric" *ngIf="lowWatermark">
+                    Low Watermark: {{lowWatermark}}
+                </xhtml:div>
+                <xhtml:div class="detail last" *ngIf="operatorStrategy">
+                    Operation: {{operatorStrategy}}
+                </xhtml:div>
+            </h4>
+        </xhtml:div>
+    </svg:foreignObject>
 </svg:g>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
 
b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
index 0533e05..f16c888 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
+++ 
b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.less
@@ -51,15 +51,16 @@
         margin-top: 24px;
       }
 
-      &.watermark {
+      &.metric {
         font-weight: normal;;
         font-weight: 12px;
+        margin-bottom: 2px;
         color: @text-color-secondary;
       }
     }
 
     .detail {
-      margin-bottom: 12px;
+      margin-bottom: 2px;
       color: @text-color;
       &.description {
         color: @heading-color;
diff --git 
a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts 
b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
index f24ef24..44b1dbf 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/share/common/dagre/node.component.ts
@@ -31,6 +31,8 @@ export class NodeComponent {
   operatorStrategy: string | null;
   parallelism: number | null;
   lowWatermark: number | null | undefined;
+  backPressuredPercentage: number | undefined = NaN;
+  busyPercentage: number | undefined = NaN;
   height = 0;
   id: string;
 
@@ -47,6 +49,13 @@ export class NodeComponent {
     this.operatorStrategy = this.decodeHTML(value.operator_strategy);
     this.parallelism = value.parallelism;
     this.lowWatermark = value.lowWatermark;
+    if (this.isValid(value.backPressuredPercentage)) {
+        console.log(value.backPressuredPercentage)
+        this.backPressuredPercentage = value.backPressuredPercentage
+    }
+    if (this.isValid(value.busyPercentage)) {
+        this.busyPercentage = value.busyPercentage;
+    }
     this.height = value.height || 0;
     this.id = value.id;
     if (description && description.length > 300) {
@@ -56,6 +65,10 @@ export class NodeComponent {
     }
   }
 
+  isValid = (value?: number) => {
+    return value || value === 0 || value === NaN;
+  }
+
   constructor(protected cd: ChangeDetectorRef) {}
 
   /**
@@ -66,4 +79,13 @@ export class NodeComponent {
     this.node = node;
     this.cd.markForCheck();
   }
+
+  prettyPrint(value: number | undefined): string {
+    if (value === undefined || isNaN(value)) {
+      return "N/A"
+    }
+    else {
+      return value + "%";
+    }
+  }
 }

Reply via email to