This is an automated email from the ASF dual-hosted git repository. panyuepeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bdef5af63b6c1e64e0a266667f498111611e2b74 Author: och5351 <[email protected]> AuthorDate: Sat Apr 11 15:25:24 2026 +0900 [FLINK-38899][runtime-web] Introduce the Rescales/History sub-page for streaming jobs with the adaptive scheduler enabled Co-authored-by: Matthias Pohl <[email protected]> Co-authored-by: Yuepeng Pan <[email protected]> --- .../src/app/interfaces/job-rescales.ts | 59 +++- .../pages/job/rescales/job-rescales.component.html | 363 +++++++++++++++++++++ .../pages/job/rescales/job-rescales.component.less | 78 +++++ .../pages/job/rescales/job-rescales.component.ts | 79 ++++- .../web-dashboard/src/app/services/job.service.ts | 18 +- 5 files changed, 589 insertions(+), 8 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts index b5db266ddb6..85e90c98428 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts @@ -16,7 +16,64 @@ * limitations under the License. */ -export interface RescalesConfig { +export type RescalesHistory = BriefJobRescaleDetails[]; + +export interface BriefJobRescaleDetails { + rescaleUuid: string; + resourceRequirementsUuid: string; + rescaleAttemptId: number; + vertices: { [jobVertexId: string]: VertexParallelismRescaleInfo }; + slots: { [slotSharingGroupId: string]: SlotSharingGroupRescaleInfo }; + schedulerStates: SchedulerState[]; + startTimestampInMillis: number; + endTimestampInMillis: number; + terminalState: string; + triggerCause: string; + terminatedReason: string; +} + +export interface JobRescaleDetails extends BriefJobRescaleDetails {} + +export interface VertexParallelismRescaleInfo { + jobVertexId: string; + jobVertexName: string; + slotSharingGroupId: string; + slotSharingGroupName: string; + desiredParallelism: number; + sufficientParallelism: number; + preRescaleParallelism: number; + postRescaleParallelism: number; +} + +export interface SlotSharingGroupRescaleInfo { + slotSharingGroupId: string; + slotSharingGroupName: string; + requestResourceProfile: ResourceProfileInfo; + desiredSlots: number; + minimalRequiredSlots: number; + preRescaleSlots: number; + postRescaleSlots: number; + acquiredResourceProfile: ResourceProfileInfo; +} + +export interface ResourceProfileInfo { + cpuCores: number; + taskHeapMemory: number; + taskOffHeapMemory: number; + managedMemory: number; + networkMemory: number; + extendedResources: { [key: string]: unknown }; +} + +export interface SchedulerState { + state: string; + enterTimestampInMillis: number; + leaveTimestampInMillis: number; + durationInMillis: number; + stringifiedException: string; +} + +export interface JobRescaleConfigInfo { rescaleHistoryMax: number; schedulerExecutionMode: string; submissionResourceWaitTimeoutInMillis: number; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html index 2e90f2fbb7f..c282aa8c945 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html @@ -21,7 +21,358 @@ [nzSize]="'small'" [nzAnimated]="{ inkBar: true, tabPane: false }" [nzTabBarExtraContent]="extraTemplate" + (nzSelectedIndexChange)="refresh()" > + <nz-tab nzTitle="History"> + <nz-table + class="no-border small" + [nzSize]="'small'" + [nzData]="jobRescaleDetails || []" + [nzFrontPagination]="false" + [nzShowPagination]="false" + > + <thead> + <tr> + <th nzWidth="70px" nzAlign="center"></th> + <th + nz-tooltip + nzTooltipTitle="The unique ID in Rescale consists of 32 hexadecimal characters" + > + <strong>Rescale UUID</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The unique ID of resource requirements consists of 32 hexadecimal characters" + > + <strong>Requirements ID</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The number ID of Rescale attempts that occurred under the same resource requirements" + > + <strong>Attempt ID</strong> + </th> + <th nz-tooltip nzTooltipTitle="The reason that triggers the target Rescale"> + <strong>Trigger Cause</strong> + </th> + <th nz-tooltip nzTooltipTitle="The end state of the target Rescale"> + <strong>Terminal State</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The reason for the completion or termination of the target Rescale" + > + <strong>Terminated Reason</strong> + </th> + <th nz-tooltip nzTooltipTitle="The start time of the target Rescale"> + <strong>Start Time</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="Duration from the start of the rescale to its completion or until now" + > + <strong>Duration</strong> + </th> + <th nz-tooltip nzTooltipTitle="The end time of the target Rescale"> + <strong>End Time</strong> + </th> + </tr> + </thead> + <tbody> + <ng-container *ngFor="let jobRescaleDetails of rescalesHistory; trackBy: trackById"> + <tr> + <td + nzShowExpand + [nzExpand]="isExpanded(jobRescaleDetails.rescaleUuid)" + (nzExpandChange)="onExpandChange(jobRescaleDetails, $event)" + ></td> + <td nz-tooltip [nzTooltipTitle]="jobRescaleDetails.rescaleUuid"> + {{ truncateUuid(jobRescaleDetails.rescaleUuid) }} + </td> + <td nz-tooltip [nzTooltipTitle]="jobRescaleDetails.resourceRequirementsUuid"> + {{ truncateUuid(jobRescaleDetails.resourceRequirementsUuid) }} + </td> + <td>{{ jobRescaleDetails.rescaleAttemptId }}</td> + <td>{{ jobRescaleDetails.triggerCause }}</td> + <td> + <flink-job-badge + *ngIf="jobRescaleDetails.terminalState" + [state]="jobRescaleDetails.terminalState" + ></flink-job-badge> + <span *ngIf="!jobRescaleDetails.terminalState">-</span> + </td> + <td>{{ jobRescaleDetails.terminatedReason || '-' }}</td> + <td> + {{ jobRescaleDetails.startTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} + </td> + <td> + {{ + (jobRescaleDetails.endTimestampInMillis || Date.now()) - + jobRescaleDetails.startTimestampInMillis | humanizeDuration + }} + </td> + <td> + {{ + jobRescaleDetails.endTimestampInMillis + ? (jobRescaleDetails.endTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS') + : '-' + }} + </td> + </tr> + <tr [nzExpand]="isExpanded(jobRescaleDetails.rescaleUuid)"> + <td colspan="10" *ngIf="isExpanded(jobRescaleDetails.rescaleUuid)" class="collapse-td"> + <div class="rescale-details-wrapper"> + <div class="rescale-details-box"> + <h3><strong>Rescale Details</strong></h3> + <tr> + <td> + <span> + <strong>Rescale UUID:</strong> + {{ truncateUuid(jobRescaleDetails.rescaleUuid) }} + </span> + <nz-divider nzType="vertical"></nz-divider> + <span> + <strong>Requirements ID:</strong> + {{ truncateUuid(jobRescaleDetails.resourceRequirementsUuid) }} + </span> + <nz-divider nzType="vertical"></nz-divider> + <span> + <strong>Attempt ID:</strong> + {{ jobRescaleDetails.rescaleAttemptId }} + </span> + <nz-divider nzType="vertical"></nz-divider> + <span> + <strong>Trigger Cause:</strong> + {{ jobRescaleDetails.triggerCause }} + </span> + <nz-divider nzType="vertical"></nz-divider> + <span> + <strong>Terminal State:</strong> + {{ jobRescaleDetails?.terminalState || '-' }} + </span> + <nz-divider nzType="vertical"></nz-divider> + <span> + <strong>Terminal Reason:</strong> + {{ jobRescaleDetails?.terminatedReason || '-' }} + </span> + </td> + </tr> + <ng-container *ngIf="getDetail(jobRescaleDetails.rescaleUuid) as detail"> + <nz-table + class="small" + [nzData]="detail.vertices | keyvalue" + [nzSize]="'small'" + [nzFrontPagination]="false" + [nzShowPagination]="false" + [nzTitle]="verticesTitle" + [nzBordered]="true" + > + <thead> + <tr> + <th + nz-tooltip + nzTooltipTitle="The unique ID of target JobVertex consists of 32 hexadecimal characters" + > + <strong>ID</strong> + </th> + <th nz-tooltip nzTooltipTitle="The short name of target vertex"> + <strong>Name</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The previous parallelism of target vertex before the current rescale" + > + <strong>Previous Parallelism</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The acquired parallelism of target vertex after the current rescale" + > + <strong>Acquired Parallelism</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The desired parallelism of the target vertex" + > + <strong>Desired Parallelism</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The minimal parallelism of target vertex to run" + > + <strong>Sufficient Parallelism</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The unique ID of the slot sharing group consists of 32 hexadecimal characters" + > + <strong>Slot Sharing Group ID</strong> + </th> + </tr> + </thead> + <tbody> + <tr *ngFor="let vertex of detail.vertices | keyvalue"> + <td nz-tooltip [nzTooltipTitle]="vertex.value.jobVertexId"> + {{ truncateUuid(vertex.value.jobVertexId) }} + </td> + <td nz-tooltip [nzTooltipTitle]="vertex.value.jobVertexName"> + {{ truncateName(vertex.value.jobVertexName) }} + </td> + <td>{{ vertex.value.preRescaleParallelism }}</td> + <td>{{ vertex.value.postRescaleParallelism || '-' }}</td> + <td>{{ vertex.value.desiredParallelism }}</td> + <td>{{ vertex.value.sufficientParallelism }}</td> + <td nz-tooltip [nzTooltipTitle]="vertex.value.slotSharingGroupId"> + {{ truncateUuid(vertex.value.slotSharingGroupId) }} + </td> + </tr> + </tbody> + </nz-table> + + <nz-table + class="small" + [nzData]="detail.slots | keyvalue" + [nzSize]="'small'" + [nzFrontPagination]="false" + [nzShowPagination]="false" + [nzTitle]="slotsTitle" + [nzBordered]="true" + > + <thead> + <tr> + <th + nz-tooltip + nzTooltipTitle="The ID of the slot sharing group to which the slot belongs consists of 32 hexadecimal characters" + > + <strong>Slot Sharing Group ID</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The name of the slot sharing group to which the slot belongs" + > + <strong>Slot Sharing Group Name</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The previous number of slots before the rescale" + > + <strong>Previous Slots</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The acquired number of slots after the rescale" + > + <strong>Acquired Slots</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The desired number of slots of the rescale" + > + <strong>Desired Slots</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The minimal number of slots to deploy tasks in the rescale" + > + <strong>Sufficient Slots</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The required resource profile of the slot sharing group in the rescale" + > + <strong>Required Profile</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The acquired resource profile of the slot sharing group in the rescale" + > + <strong>Acquired Profile</strong> + </th> + </tr> + </thead> + <tbody> + <tr *ngFor="let slot of detail.slots | keyvalue"> + <td nz-tooltip [nzTooltipTitle]="slot.value.slotSharingGroupId"> + {{ truncateUuid(slot.value.slotSharingGroupId) }} + </td> + <td>{{ slot.value.slotSharingGroupName }}</td> + <td>{{ slot.value.preRescaleSlots }}</td> + <td>{{ slot.value.postRescaleSlots || '-' }}</td> + <td>{{ slot.value.desiredSlots }}</td> + <td>{{ slot.value.minimalRequiredSlots }}</td> + <td> + <pre style="margin: 0">{{ + slot.value.requestResourceProfile | json + }}</pre> + </td> + <td> + <pre style="margin: 0">{{ + slot.value.acquiredResourceProfile | json + }}</pre> + </td> + </tr> + </tbody> + </nz-table> + + <nz-table + class="small" + [nzData]="detail.schedulerStates" + [nzSize]="'small'" + [nzFrontPagination]="false" + [nzShowPagination]="false" + [nzTitle]="schedulerStatesTitle" + [nzBordered]="true" + > + <thead> + <tr> + <th nz-tooltip nzTooltipTitle="The scheduler state name"> + <strong>State</strong> + </th> + <th nz-tooltip nzTooltipTitle="The time to enter the state"> + <strong>Enter Time</strong> + </th> + <th nz-tooltip nzTooltipTitle="The time to leave the state"> + <strong>Leave Time</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The duration time from enter time to leave time of the state" + > + <strong>Duration</strong> + </th> + <th + nz-tooltip + nzTooltipTitle="The exception information about current rescale during the state" + > + <strong>Exception</strong> + </th> + </tr> + </thead> + <tbody> + <tr *ngFor="let state of detail.schedulerStates"> + <td>{{ state.state }}</td> + <td> + {{ state.enterTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} + </td> + <td> + {{ state.leaveTimestampInMillis | date: 'yyyy-MM-dd HH:mm:ss.SSS' }} + </td> + <td>{{ state.durationInMillis | humanizeDuration }}</td> + <td>{{ state.stringifiedException }}</td> + </tr> + </tbody> + </nz-table> + </ng-container> + + <div *ngIf="!getDetail(jobRescaleDetails.rescaleUuid)">Loading...</div> + </div> + </div> + </td> + </tr> + </ng-container> + </tbody> + </nz-table> + </nz-tab> <nz-tab nzTitle="Configuration"> <nz-table class="no-border small" @@ -100,4 +451,16 @@ </button> </ng-template> +<ng-template #verticesTitle> + <strong>Vertices</strong> +</ng-template> + +<ng-template #slotsTitle> + <strong>Slots</strong> +</ng-template> + +<ng-template #schedulerStatesTitle> + <strong>Scheduler State History</strong> +</ng-template> + <nz-empty *ngIf="!rescalesConfig"></nz-empty> diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less index 6b780729481..bbca1186be4 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less @@ -27,6 +27,14 @@ .ant-tabs-nav-list { padding: 4px 16px; } + + > .ant-table-wrapper { + .ant-table { + > tbody > tr.ant-table-expanded-row > td { + padding: 0 !important; + } + } + } } } @@ -37,3 +45,73 @@ nz-empty { padding: 24px; } + +::ng-deep { + .nz-disable-td { + width: 100% !important; + padding: 0 !important; + } + + .collapse-td { + width: 100% !important; + padding: 0 !important; + } + + tr.ant-table-expanded-row { + width: 100%; + + > td { + width: 100% !important; + } + } +} + +.rescale-details-wrapper { + display: block; + box-sizing: border-box; + width: 100%; + padding: 0 24px; +} + +.rescale-details-box { + margin: 16px 0; + padding: 16px; + border: 1px solid #f0f0f0; + background-color: #fff; + + ::ng-deep { + nz-table.ant-table-wrapper { + display: block !important; + width: 100% !important; + margin-bottom: 16px; + + &:last-child { + margin-bottom: 0; + } + } + + .ant-spin-nested-loading { + display: block; + width: 100%; + } + + .ant-spin-container { + display: block; + width: 100%; + } + + .ant-table { + width: 100% !important; + table-layout: fixed !important; + + colgroup { + display: none; + } + } + + table { + width: 100% !important; + table-layout: fixed !important; + } + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts index 4b80254bedf..6f1a1ecc1c5 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts @@ -16,13 +16,20 @@ * limitations under the License. */ -import { NgIf } from '@angular/common'; +import { NgFor, NgIf, JsonPipe, DatePipe, KeyValuePipe } from '@angular/common'; import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core'; import { forkJoin, of, Subject } from 'rxjs'; import { catchError, distinctUntilChanged, switchMap, takeUntil } from 'rxjs/operators'; import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe'; -import { RescalesConfig, JobDetail } from '@flink-runtime-web/interfaces'; +import { JobBadgeComponent } from '@flink-runtime-web/components/job-badge/job-badge.component'; +import { + BriefJobRescaleDetails, + JobRescaleDetails, + JobRescaleConfigInfo, + JobDetail, + RescalesHistory +} from '@flink-runtime-web/interfaces'; import { JobService } from '@flink-runtime-web/services'; import { NzButtonModule } from 'ng-zorro-antd/button'; import { NzCollapseModule } from 'ng-zorro-antd/collapse'; @@ -42,8 +49,13 @@ import { JobLocalService } from '../job-local.service'; changeDetection: ChangeDetectionStrategy.OnPush, imports: [ NgIf, + NgFor, + JsonPipe, + DatePipe, + KeyValuePipe, NzTabsModule, NzDividerModule, + JobBadgeComponent, HumanizeDurationPipe, NzTableModule, NzIconModule, @@ -54,8 +66,12 @@ import { JobLocalService } from '../job-local.service'; ] }) export class JobRescalesComponent implements OnInit, OnDestroy { - public rescalesConfig?: RescalesConfig; + public rescalesHistory?: RescalesHistory; + public rescaleDetailsMap = new Map<string, JobRescaleDetails>(); + public rescalesConfig?: JobRescaleConfigInfo; public jobDetail: JobDetail; + public expandedRowsSet = new Set<string>(); + public readonly Date = Date; private refresh$ = new Subject<void>(); private destroy$ = new Subject<void>(); @@ -71,6 +87,11 @@ export class JobRescalesComponent implements OnInit, OnDestroy { .pipe( switchMap(() => forkJoin([ + this.jobService.loadRescalesHistory(this.jobDetail.jid).pipe( + catchError(() => { + return of(undefined); + }) + ), this.jobService.loadRescalesConfig(this.jobDetail.jid).pipe( catchError(() => { return of(undefined); @@ -80,7 +101,8 @@ export class JobRescalesComponent implements OnInit, OnDestroy { ), takeUntil(this.destroy$) ) - .subscribe(([config]) => { + .subscribe(([history, config]) => { + this.rescalesHistory = history; this.rescalesConfig = config; this.cdr.markForCheck(); }); @@ -105,6 +127,55 @@ export class JobRescalesComponent implements OnInit, OnDestroy { } public refresh(): void { + const expandedUuids = Array.from(this.expandedRowsSet); + + expandedUuids.forEach(uuid => { + this.jobService + .loadRescaleDetail(this.jobDetail.jid, uuid) + .pipe(takeUntil(this.destroy$)) + .subscribe(detail => { + this.rescaleDetailsMap.set(uuid, detail); + this.cdr.markForCheck(); + }); + }); + this.refresh$.next(); } + + public trackById(item: BriefJobRescaleDetails): string { + return item.rescaleUuid; + } + + public onExpandChange(jobRescaleDetails: BriefJobRescaleDetails, expanded: boolean): void { + if (expanded) { + this.expandedRowsSet.add(jobRescaleDetails.rescaleUuid); + if (!this.rescaleDetailsMap.has(jobRescaleDetails.rescaleUuid)) { + this.jobService + .loadRescaleDetail(this.jobDetail.jid, jobRescaleDetails.rescaleUuid) + .pipe(takeUntil(this.destroy$)) + .subscribe(detail => { + this.rescaleDetailsMap.set(jobRescaleDetails.rescaleUuid, detail); + this.cdr.markForCheck(); + }); + } + } else { + this.expandedRowsSet.delete(jobRescaleDetails.rescaleUuid); + } + } + + public isExpanded(rescaleUuid: string): boolean { + return this.expandedRowsSet.has(rescaleUuid); + } + + public getDetail(rescaleUuid: string): JobRescaleDetails | undefined { + return this.rescaleDetailsMap.get(rescaleUuid); + } + + public truncateUuid(uuid: string): string { + return uuid ? uuid.substring(0, 8) : ''; + } + + public truncateName(name: string, maxLength: number = 32): string { + return name && name.length > maxLength ? `${name.substring(0, maxLength)}...` : name; + } } diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts index 313a3e9308e..aff1e903b6d 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts @@ -26,7 +26,9 @@ import { CheckpointConfig, CheckpointDetail, CheckpointSubTask, - RescalesConfig, + RescalesHistory, + JobRescaleDetails, + JobRescaleConfigInfo, JobAccumulators, JobBackpressure, JobConfig, @@ -179,8 +181,18 @@ export class JobService { ); } - public loadRescalesConfig(jobId: string): Observable<RescalesConfig> { - return this.httpClient.get<RescalesConfig>(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/config`); + public loadRescalesHistory(jobId: string): Observable<RescalesHistory> { + return this.httpClient.get<RescalesHistory>(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/history`); + } + + public loadRescaleDetail(jobId: string, rescaleUuid: string): Observable<JobRescaleDetails> { + return this.httpClient.get<JobRescaleDetails>( + `${this.configService.BASE_URL}/jobs/${jobId}/rescales/details/${rescaleUuid}` + ); + } + + public loadRescalesConfig(jobId: string): Observable<JobRescaleConfigInfo> { + return this.httpClient.get<JobRescaleConfigInfo>(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/config`); } public loadJobResourceRequirements(jobId: string): Observable<JobResourceRequirements> {
