This is an automated email from the ASF dual-hosted git repository. panyuepeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd1629430d1ed1d2f11f75996cdb7c2779a0427b Author: och5351 <[email protected]> AuthorDate: Fri Mar 27 18:12:52 2026 +0900 [FLINK-38901][runtime-web] Introduce the Rescales/Configuration sub-page for streaming jobs with the adaptive scheduler enabled Co-authored-by: Yuepeng Pan <[email protected]> Co-authored-by: Matthias Pohl <[email protected]> --- .../web-dashboard/src/app/interfaces/job-detail.ts | 1 + .../interfaces/{public-api.ts => job-rescales.ts} | 31 +++--- .../web-dashboard/src/app/interfaces/public-api.ts | 1 + .../job/job-detail/status/job-status.component.ts | 17 ++++ .../web-dashboard/src/app/pages/job/job.config.ts | 3 +- .../app/pages/job/modules/completed-job/routes.ts | 9 ++ .../app/pages/job/modules/running-job/routes.ts | 8 ++ .../pages/job/rescales/job-rescales.component.html | 103 +++++++++++++++++++ .../job/rescales/job-rescales.component.less} | 41 ++++---- .../pages/job/rescales/job-rescales.component.ts | 110 +++++++++++++++++++++ .../web-dashboard/src/app/services/job.service.ts | 5 + 11 files changed, 288 insertions(+), 41 deletions(-) diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts index 21724a6b410..e178fd6f0ea 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts @@ -61,6 +61,7 @@ export interface JobDetail { 'stream-graph': StreamGraph; 'pending-operators': number; 'application-id': string; + schedulerType: string; } interface Plan { diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts similarity index 55% copy from flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts copy to flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts index 38d072557e1..b5db266ddb6 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts @@ -16,23 +16,14 @@ * limitations under the License. */ -export * from './configuration'; -export * from './jar'; -export * from './job-overview'; -export * from './job-detail'; -export * from './job-exception'; -export * from './job-timeline'; -export * from './job-config'; -export * from './job-vertex'; -export * from './job-checkpoint'; -export * from './job-backpressure'; -export * from './job-flamegraph'; -export * from './plan'; -export * from './overview'; -export * from './task-manager'; -export * from './job-accumulators'; -export * from './job-manager'; -export * from './job-metrics'; -export * from './application-overview'; -export * from './application-detail'; -export * from './application-exception'; +export interface RescalesConfig { + rescaleHistoryMax: number; + schedulerExecutionMode: string; + submissionResourceWaitTimeoutInMillis: number; + submissionResourceStabilizationTimeoutInMillis: number; + slotIdleTimeoutInMillis: number; + executingCooldownTimeoutInMillis: number; + executingResourceStabilizationTimeoutInMillis: number; + maximumDelayForTriggeringRescaleInMillis: number; + rescaleOnFailedCheckpointCount: number; +} diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts index 38d072557e1..86a6116d223 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts @@ -27,6 +27,7 @@ export * from './job-vertex'; export * from './job-checkpoint'; export * from './job-backpressure'; export * from './job-flamegraph'; +export * from './job-rescales'; export * from './plan'; export * from './overview'; export * from './task-manager'; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts index 17e958a816f..b243d136360 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts @@ -63,6 +63,7 @@ export class JobStatusComponent implements OnInit, OnDestroy { urlLoading = true; readonly listOfNavigation: RouterTab[]; private readonly checkpointIndexOfNavigation: number; + private readonly rescalesIndexOfNavigation: number; webCancelEnabled = this.statusService.configuration.features['web-cancel']; isHistoryServer = this.statusService.configuration.features['web-history']; @@ -80,6 +81,7 @@ export class JobStatusComponent implements OnInit, OnDestroy { ) { this.listOfNavigation = moduleConfig.routerTabs || JOB_MODULE_DEFAULT_CONFIG.routerTabs; this.checkpointIndexOfNavigation = this.checkpointIndexOfNav(); + this.rescalesIndexOfNavigation = this.rescalesIndexOfNav(); } ngOnInit(): void { @@ -126,6 +128,10 @@ export class JobStatusComponent implements OnInit, OnDestroy { return this.listOfNavigation.findIndex(item => item.path === 'checkpoints'); } + rescalesIndexOfNav(): number { + return this.listOfNavigation.findIndex(item => item.path === 'rescales'); + } + private handleJobDetailChanged(data: JobDetailCorrect): void { this.jobDetail = data; const checkpointNavIndex = this.checkpointIndexOfNav(); @@ -137,6 +143,17 @@ export class JobStatusComponent implements OnInit, OnDestroy { } else if (data.plan.type == 'BATCH' && checkpointNavIndex > -1) { this.listOfNavigation.splice(checkpointNavIndex, 1); } + + const rescalesNavIndex = this.rescalesIndexOfNav(); + const shouldShowRescales = data.plan.type == 'STREAMING' && data.schedulerType == 'Adaptive'; + if (!shouldShowRescales && rescalesNavIndex > -1) { + this.listOfNavigation.splice(rescalesNavIndex, 1); + } else if (shouldShowRescales && rescalesNavIndex == -1) { + this.listOfNavigation.splice(this.rescalesIndexOfNavigation, 0, { + path: 'rescales', + title: 'Rescales' + }); + } this.cdr.markForCheck(); } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts index 59ef1027518..4995f1773ff 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts @@ -29,7 +29,8 @@ export const JOB_MODULE_DEFAULT_CONFIG: Required<JobModuleConfig> = { { title: 'Data Skew', path: 'dataskew' }, { title: 'TimeLine', path: 'timeline' }, { title: 'Checkpoints', path: 'checkpoints' }, - { title: 'Configuration', path: 'configuration' } + { title: 'Configuration', path: 'configuration' }, + { title: 'Rescales', path: 'rescales' } ] }; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts index 496acbaf2e2..1d70929daf8 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts @@ -53,6 +53,7 @@ const OVERRIDE_JOB_MODULE_CONFIG_FACTORY = (statusService: StatusService): JobMo { title: 'TimeLine', path: 'timeline' }, { title: 'Checkpoints', path: 'checkpoints' }, { title: 'Job Configuration', path: 'configuration' }, + { title: 'Rescales', path: 'rescales' }, { title: 'Cluster Configuration', path: 'cluster_configuration' } ] : JOB_MODULE_DEFAULT_CONFIG.routerTabs @@ -129,6 +130,14 @@ export const COMPLETED_JOB_ROUES: Routes = [ path: 'configuration' } }, + { + path: 'rescales', + loadComponent: () => + import('@flink-runtime-web/pages/job/rescales/job-rescales.component').then(m => m.JobRescalesComponent), + data: { + path: 'rescales' + } + }, { path: 'cluster_configuration', loadComponent: () => diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts index c09afabf9ab..cf4e2a2722b 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts @@ -80,6 +80,14 @@ export const RUNNING_JOB_ROUTES: Routes = [ path: 'configuration' } }, + { + path: 'rescales', + loadComponent: () => + import('@flink-runtime-web/pages/job/rescales/job-rescales.component').then(m => m.JobRescalesComponent), + data: { + path: 'rescales' + } + }, { path: '**', redirectTo: 'overview', pathMatch: 'full' } ] } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html new file mode 100644 index 00000000000..2e90f2fbb7f --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html @@ -0,0 +1,103 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<nz-tabs + *ngIf="rescalesConfig" + [nzSize]="'small'" + [nzAnimated]="{ inkBar: true, tabPane: false }" + [nzTabBarExtraContent]="extraTemplate" +> + <nz-tab nzTitle="Configuration"> + <nz-table + class="no-border small" + [nzData]="rescalesConfig ? [''] : []" + [nzSize]="'small'" + [nzFrontPagination]="false" + [nzShowPagination]="false" + > + <thead> + <tr> + <th><strong>Option</strong></th> + <th><strong>Value</strong></th> + </tr> + </thead> + <tbody> + <ng-container *ngIf="rescalesConfig"> + <tr> + <td>Scheduler Execution Mode</td> + <td *ngIf="rescalesConfig['schedulerExecutionMode'] === 'REACTIVE'">REACTIVE</td> + <td *ngIf="rescalesConfig['schedulerExecutionMode'] !== 'REACTIVE'"></td> + </tr> + <tr> + <td>Submission Resource Wait Timeout</td> + <td> + {{ rescalesConfig['submissionResourceWaitTimeoutInMillis'] | humanizeDuration }} + </td> + </tr> + <tr> + <td>Submission ResourceStabilization Timeout</td> + <td> + {{ + rescalesConfig['submissionResourceStabilizationTimeoutInMillis'] | humanizeDuration + }} + </td> + </tr> + <tr> + <td>Slot Idle Timeout</td> + <td>{{ rescalesConfig['slotIdleTimeoutInMillis'] | humanizeDuration }}</td> + </tr> + <tr> + <td>Executing Cooldown Timeout</td> + <td>{{ rescalesConfig['executingCooldownTimeoutInMillis'] | humanizeDuration }}</td> + </tr> + <tr> + <td>Executing Resource Stabilization Timeout</td> + <td> + {{ + rescalesConfig['executingResourceStabilizationTimeoutInMillis'] | humanizeDuration + }} + </td> + </tr> + <tr> + <td>Maximum Delay For Triggering Rescale</td> + <td> + {{ rescalesConfig['maximumDelayForTriggeringRescaleInMillis'] | humanizeDuration }} + </td> + </tr> + <tr> + <td>Rescale On Failed Checkpoint Count</td> + <td>{{ rescalesConfig['rescaleOnFailedCheckpointCount'] }}</td> + </tr> + <tr> + <td>History Max</td> + <td>{{ rescalesConfig['rescaleHistoryMax'] }}</td> + </tr> + </ng-container> + </tbody> + </nz-table> + </nz-tab> +</nz-tabs> + +<ng-template #extraTemplate> + <button nz-button nzType="primary" class="refresh" nzSize="small" (click)="refresh()"> + <i nz-icon nzType="sync"></i> + Refresh + </button> +</ng-template> + +<nz-empty *ngIf="!rescalesConfig"></nz-empty> diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less similarity index 55% copy from flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts copy to flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less index 38d072557e1..6b780729481 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less @@ -16,23 +16,24 @@ * limitations under the License. */ -export * from './configuration'; -export * from './jar'; -export * from './job-overview'; -export * from './job-detail'; -export * from './job-exception'; -export * from './job-timeline'; -export * from './job-config'; -export * from './job-vertex'; -export * from './job-checkpoint'; -export * from './job-backpressure'; -export * from './job-flamegraph'; -export * from './plan'; -export * from './overview'; -export * from './task-manager'; -export * from './job-accumulators'; -export * from './job-manager'; -export * from './job-metrics'; -export * from './application-overview'; -export * from './application-detail'; -export * from './application-exception'; +:host { + ::ng-deep { + .ant-tabs-tabpane { + position: relative; + top: -16px; + padding: 24px; + } + + .ant-tabs-nav-list { + padding: 4px 16px; + } + } +} + +.refresh { + margin-right: 12px; +} + +nz-empty { + padding: 24px; +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts new file mode 100644 index 00000000000..4b80254bedf --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { NgIf } from '@angular/common'; +import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core'; +import { forkJoin, of, Subject } from 'rxjs'; +import { catchError, distinctUntilChanged, switchMap, takeUntil } from 'rxjs/operators'; + +import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe'; +import { RescalesConfig, JobDetail } from '@flink-runtime-web/interfaces'; +import { JobService } from '@flink-runtime-web/services'; +import { NzButtonModule } from 'ng-zorro-antd/button'; +import { NzCollapseModule } from 'ng-zorro-antd/collapse'; +import { NzDividerModule } from 'ng-zorro-antd/divider'; +import { NzEmptyModule } from 'ng-zorro-antd/empty'; +import { NzIconModule } from 'ng-zorro-antd/icon'; +import { NzTableModule } from 'ng-zorro-antd/table'; +import { NzTabsModule } from 'ng-zorro-antd/tabs'; +import { NzTooltipModule } from 'ng-zorro-antd/tooltip'; + +import { JobLocalService } from '../job-local.service'; + +@Component({ + selector: 'flink-job-rescales', + templateUrl: './job-rescales.component.html', + styleUrls: ['./job-rescales.component.less'], + changeDetection: ChangeDetectionStrategy.OnPush, + imports: [ + NgIf, + NzTabsModule, + NzDividerModule, + HumanizeDurationPipe, + NzTableModule, + NzIconModule, + NzButtonModule, + NzEmptyModule, + NzCollapseModule, + NzTooltipModule + ] +}) +export class JobRescalesComponent implements OnInit, OnDestroy { + public rescalesConfig?: RescalesConfig; + public jobDetail: JobDetail; + + private refresh$ = new Subject<void>(); + private destroy$ = new Subject<void>(); + + constructor( + private readonly jobService: JobService, + private readonly jobLocalService: JobLocalService, + private readonly cdr: ChangeDetectorRef + ) {} + + public ngOnInit(): void { + this.refresh$ + .pipe( + switchMap(() => + forkJoin([ + this.jobService.loadRescalesConfig(this.jobDetail.jid).pipe( + catchError(() => { + return of(undefined); + }) + ) + ]) + ), + takeUntil(this.destroy$) + ) + .subscribe(([config]) => { + this.rescalesConfig = config; + this.cdr.markForCheck(); + }); + + this.jobLocalService + .jobDetailChanges() + .pipe( + distinctUntilChanged((pre, next) => pre.jid === next.jid), + takeUntil(this.destroy$) + ) + .subscribe(data => { + this.jobDetail = data; + this.cdr.markForCheck(); + this.refresh$.next(); + }); + } + + public ngOnDestroy(): void { + this.destroy$.next(); + this.destroy$.complete(); + this.refresh$.complete(); + } + + public refresh(): void { + this.refresh$.next(); + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts index bd4a0d9bdbb..313a3e9308e 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts @@ -26,6 +26,7 @@ import { CheckpointConfig, CheckpointDetail, CheckpointSubTask, + RescalesConfig, JobAccumulators, JobBackpressure, JobConfig, @@ -178,6 +179,10 @@ export class JobService { ); } + public loadRescalesConfig(jobId: string): Observable<RescalesConfig> { + return this.httpClient.get<RescalesConfig>(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/config`); + } + public loadJobResourceRequirements(jobId: string): Observable<JobResourceRequirements> { return this.httpClient.get<JobResourceRequirements>( `${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`
