This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 40d2cfbc11f [FLINK-31471] Allow setting JobResourceRequirements through WEB UI. 40d2cfbc11f is described below commit 40d2cfbc11f4c3598dcba5cbc7367237a9ddbf2f Author: David Moravek <d...@apache.org> AuthorDate: Tue Apr 4 14:49:58 2023 +0200 [FLINK-31471] Allow setting JobResourceRequirements through WEB UI. --- .../shortcodes/generated/rest_v1_dispatcher.html | 3 + .../shortcodes/generated/web_configuration.html | 6 ++ docs/static/generated/rest_v1_dispatcher.yml | 2 + .../apache/flink/configuration/ClusterOptions.java | 2 +- .../org/apache/flink/configuration/WebOptions.java | 10 +- .../runtime/webmonitor/history/HistoryServer.java | 1 + .../src/test/resources/rest_api_v1.snapshot | 3 + .../src/app/interfaces/configuration.ts | 1 + ...nfiguration.ts => job-resource-requirements.ts} | 30 ++---- .../pages/job/overview/job-overview.component.html | 1 + .../pages/job/overview/job-overview.component.ts | 14 ++- .../overview/list/job-overview-list.component.html | 37 +++++++- .../overview/list/job-overview-list.component.ts | 61 +++++++++++- .../src/app/services/job.service.spec.ts | 102 +++++++++++++++++++++ .../web-dashboard/src/app/services/job.service.ts | 32 ++++++- .../rest/handler/RestHandlerConfiguration.java | 19 +++- .../handler/cluster/DashboardConfigHandler.java | 4 +- .../rest/messages/DashboardConfiguration.java | 16 +++- .../runtime/webmonitor/WebMonitorEndpoint.java | 3 +- .../rest/handler/RestHandlerConfigurationTest.java | 55 +++++++++-- .../rest/messages/DashboardConfigurationTest.java | 2 +- 21 files changed, 353 insertions(+), 51 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 4d446c68a06..ae11c7dc4f1 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -70,6 +70,9 @@ "web-history" : { "type" : "boolean" }, + "web-rescale" : { + "type" : "boolean" + }, "web-submit" : { "type" : "boolean" } diff --git a/docs/layouts/shortcodes/generated/web_configuration.html b/docs/layouts/shortcodes/generated/web_configuration.html index 693580ba83b..7ac55f5685c 100644 --- a/docs/layouts/shortcodes/generated/web_configuration.html +++ b/docs/layouts/shortcodes/generated/web_configuration.html @@ -50,6 +50,12 @@ <td>Long</td> <td>Refresh interval for the web-frontend in milliseconds.</td> </tr> + <tr> + <td><h5>web.rescale.enable</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Flag indicating whether jobs can be rescaled from the web-frontend.</td> + </tr> <tr> <td><h5>web.submit.enable</h5></td> <td style="word-wrap: break-word;">true</td> diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 2e8d49145a2..0135b51469a 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -2090,6 +2090,8 @@ components: type: boolean web-history: type: boolean + web-rescale: + type: boolean web-submit: type: boolean GarbageCollectorInfo: diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java index 2785f598dbb..92678f82465 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java @@ -213,7 +213,7 @@ public class ClusterOptions { } } - private static boolean isReactiveModeEnabled(Configuration configuration) { + public static boolean isReactiveModeEnabled(Configuration configuration) { return configuration.get(JobManagerOptions.SCHEDULER_MODE) == SchedulerExecutionMode.REACTIVE; } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java index ebc9ac4e85b..e40adcbaaaa 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java @@ -133,7 +133,7 @@ public class WebOptions { .withDescription( "Flag indicating whether jobs can be uploaded and run from the web-frontend."); - /** Config parameter indicating whether jobs can be cancel from the web-frontend. */ + /** Config parameter indicating whether jobs can be canceled from the web-frontend. */ public static final ConfigOption<Boolean> CANCEL_ENABLE = key("web.cancel.enable") .booleanType() @@ -141,6 +141,14 @@ public class WebOptions { .withDescription( "Flag indicating whether jobs can be canceled from the web-frontend."); + /** Config parameter indicating whether jobs can be rescaled from the web-frontend. */ + public static final ConfigOption<Boolean> RESCALE_ENABLE = + key("web.rescale.enable") + .booleanType() + .defaultValue(true) + .withDescription( + "Flag indicating whether jobs can be rescaled from the web-frontend."); + /** Config parameter defining the number of checkpoints to remember for recent history. */ public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE = key("web.checkpoints.history") diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index fff6d42306f..0f170d9d79a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -379,6 +379,7 @@ public class HistoryServer { ZonedDateTime.now(), false, false, + false, true))); fw.flush(); } catch (IOException ioe) { diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 04f092a2f8b..20d01ddea8a 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -62,6 +62,9 @@ "web-cancel" : { "type" : "boolean" }, + "web-rescale" : { + "type" : "boolean" + }, "web-history" : { "type" : "boolean" } diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts index dac450e3c1c..a500a729a15 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts @@ -42,5 +42,6 @@ export interface Configuration { 'web-history': boolean; 'web-submit': boolean; 'web-cancel': boolean; + 'web-rescale': boolean; }; } diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts similarity index 61% copy from flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts copy to flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts index dac450e3c1c..104cf81e4e4 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts @@ -16,31 +16,13 @@ * limitations under the License. */ -export interface JvmInfo { - version: string; - arch: string; - options: string[]; +export interface JobResourceRequirements { + [key: string]: JobVertexResourceRequirements; } -export interface EnvironmentInfo { - jvm: JvmInfo; - classpath: string[]; -} - -export interface ClusterConfiguration { - key: string; - value: string; -} - -export interface Configuration { - 'refresh-interval': number; - 'timezone-name': string; - 'timezone-offset': number; - 'flink-version': string; - 'flink-revision': string; - features: { - 'web-history': boolean; - 'web-submit': boolean; - 'web-cancel': boolean; +export interface JobVertexResourceRequirements { + parallelism: { + lowerBound: number; + upperBound: number; }; } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html index 533540317d9..53e5e487a08 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html @@ -30,6 +30,7 @@ <ng-container *ngIf="nodes.length > 0"> <flink-job-overview-list (nodeClick)="onNodeClick($event)" + (rescale)="onRescale($event)" [nodes]="nodes" [selectedNode]="selectedNode" ></flink-job-overview-list> diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts index 28e5d15d879..10f7c034b40 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts @@ -34,8 +34,9 @@ import { DagreComponent } from '@flink-runtime-web/components/dagre/dagre.compon import { ResizeComponent } from '@flink-runtime-web/components/resize/resize.component'; import { NodesItemCorrect, NodesItemLink } from '@flink-runtime-web/interfaces'; import { JobOverviewListComponent } from '@flink-runtime-web/pages/job/overview/list/job-overview-list.component'; -import { MetricsService } from '@flink-runtime-web/services'; +import { JobService, MetricsService } from '@flink-runtime-web/services'; import { NzAlertModule } from 'ng-zorro-antd/alert'; +import { NzNotificationService } from 'ng-zorro-antd/notification'; import { JobLocalService } from '../job-local.service'; @@ -65,6 +66,8 @@ export class JobOverviewComponent implements OnInit, OnDestroy { public readonly elementRef: ElementRef, private readonly metricService: MetricsService, private readonly jobLocalService: JobLocalService, + private readonly jobService: JobService, + private readonly notificationService: NzNotificationService, private readonly cdr: ChangeDetectorRef ) {} @@ -115,6 +118,15 @@ export class JobOverviewComponent implements OnInit, OnDestroy { } } + public onRescale(desiredParallelism: Map<string, number>): void { + this.jobService.changeDesiredParallelism(this.jobId, desiredParallelism).subscribe(() => { + this.notificationService.success( + 'Rescaling operation.', + 'Job resources requirements have been updated. Job will now try to rescale.' + ); + }); + } + public onResizeEnd(): void { if (!this.selectedNode) { this.dagreComponent.moveToCenter(); diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html index 873c7c53231..c6a4f9ae92c 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html @@ -33,11 +33,12 @@ <th [nzSortFn]="sortReadRecordsFn" nzWidth="150px">Records Received</th> <th [nzSortFn]="sortWriteBytesFn" nzWidth="150px">Bytes Sent</th> <th [nzSortFn]="sortWriteRecordsFn" nzWidth="120px">Records Sent</th> - <th [nzSortFn]="sortParallelismFn" nzWidth="120px">Parallelism</th> + <th [nzSortFn]="sortParallelismFn" nzWidth="100px">Parallelism</th> <th [nzSortFn]="sortStartTimeFn" nzWidth="150px">Start Time</th> <th [nzSortFn]="sortDurationFn" nzWidth="150px">Duration</th> <th [nzSortFn]="sortEndTimeFn" nzWidth="150px">End Time</th> - <th nzWidth="100px" nzRight>Tasks</th> + <th nzWidth="60px" nzRight>Tasks</th> + <th *ngIf="webRescaleEnabled" nzWidth="80px" nzRight>Scale</th> </tr> </thead> <tbody> @@ -95,13 +96,43 @@ {{ node.detail.metrics['write-records'] | number: '1.0-0' }} </span> </td> - <td>{{ node.parallelism }}</td> + <td> + <span> + {{ node.parallelism }} + </span> + <span *ngIf="desiredParallelism.has(node.id)"> + <b> + <i nz-icon nzType="sync"></i> + {{ desiredParallelism.get(node.id) }} + </b> + </span> + </td> <td>{{ node.detail['start-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td> <td>{{ node.detail?.duration | humanizeDuration }}</td> <td>{{ node.detail['end-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' }}</td> <td nzRight [class.selected]="selectedNode?.id === node.id"> <flink-task-badge [tasks]="node.detail?.tasks"></flink-task-badge> </td> + <td *ngIf="webRescaleEnabled" nzRight> + <nz-button-group> + <button + nz-button + nzSize="small" + nzType="default" + (click)="clickScaleUp(node); $event.stopPropagation()" + > + <span nz-icon nzType="plus"></span> + </button> + <button + nz-button + nzSize="small" + nzType="default" + (click)="clickScaleDown(node); $event.stopPropagation()" + > + <span nz-icon nzType="minus"></span> + </button> + </nz-button-group> + </td> </tr> </tbody> </nz-table> diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts index 498f10a852d..ad632360118 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts @@ -26,6 +26,10 @@ import { JobBadgeComponent } from '@flink-runtime-web/components/job-badge/job-b import { ResizeComponent } from '@flink-runtime-web/components/resize/resize.component'; import { TaskBadgeComponent } from '@flink-runtime-web/components/task-badge/task-badge.component'; import { NodesItemCorrect } from '@flink-runtime-web/interfaces'; +import { StatusService } from '@flink-runtime-web/services'; +import { NzBadgeModule } from 'ng-zorro-antd/badge'; +import { NzButtonModule } from 'ng-zorro-antd/button'; +import { NzIconModule } from 'ng-zorro-antd/icon'; import { NzTableModule } from 'ng-zorro-antd/table'; import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types'; import { NzToolTipModule } from 'ng-zorro-antd/tooltip'; @@ -36,6 +40,8 @@ function createSortFn( return (pre, next) => (selector(pre)! > selector(next)! ? 1 : -1); } +const rescaleTimeout = 2500; + @Component({ selector: 'flink-job-overview-list', templateUrl: './job-overview-list.component.html', @@ -52,12 +58,16 @@ function createSortFn( HumanizeDatePipe, HumanizeDurationPipe, TaskBadgeComponent, - ResizeComponent + ResizeComponent, + NzButtonModule, + NzIconModule, + NzBadgeModule ], standalone: true }) export class JobOverviewListComponent { public readonly trackById = (_: number, node: NodesItemCorrect): string => node.id; + public readonly webRescaleEnabled = this.statusService.configuration.features['web-rescale']; public readonly sortStatusFn = createSortFn(item => item.detail?.status); public readonly sortReadBytesFn = createSortFn(item => item.detail?.metrics?.['read-bytes']); @@ -70,26 +80,69 @@ export class JobOverviewListComponent { public readonly sortEndTimeFn = createSortFn(item => item.detail?.['end-time']); public innerNodes: NodesItemCorrect[] = []; - public sortName: string; - public sortValue: string; public left = 390; + public desiredParallelism = new Map<string, number>(); + + public rescaleTimeoutId: number | undefined; + @Output() public readonly nodeClick = new EventEmitter<NodesItemCorrect>(); + @Output() public readonly rescale = new EventEmitter<Map<string, number>>(); + @Input() public selectedNode: NodesItemCorrect; @Input() public set nodes(value: NodesItemCorrect[]) { this.innerNodes = value; + for (const node of value) { + if (node.parallelism == this.desiredParallelism.get(node.id)) { + this.desiredParallelism.delete(node.id); + } + } } public get nodes(): NodesItemCorrect[] { return this.innerNodes; } - constructor(public readonly elementRef: ElementRef) {} + constructor(public readonly elementRef: ElementRef, private readonly statusService: StatusService) {} public clickNode(node: NodesItemCorrect): void { this.nodeClick.emit(node); } + + public clickScaleUp(node: NodesItemCorrect): void { + let currentDesiredParallelism = this.desiredParallelism.get(node.id); + if (currentDesiredParallelism == undefined) { + currentDesiredParallelism = node.parallelism; + } + const newDesiredParallelism = currentDesiredParallelism + 1; + this.changeDesiredParallelism(node, newDesiredParallelism); + } + + public clickScaleDown(node: NodesItemCorrect): void { + let currentDesiredParallelism = this.desiredParallelism.get(node.id); + if (currentDesiredParallelism == undefined) { + currentDesiredParallelism = node.parallelism; + } + const newDesiredParallelism = Math.max(1, currentDesiredParallelism - 1); + this.changeDesiredParallelism(node, newDesiredParallelism); + } + + private changeDesiredParallelism(node: NodesItemCorrect, newDesiredParallelism: number): void { + if (newDesiredParallelism == node.parallelism) { + this.desiredParallelism.delete(node.id); + } else { + this.desiredParallelism.set(node.id, newDesiredParallelism); + } + if (this.rescaleTimeoutId != undefined) { + window.clearTimeout(this.rescaleTimeoutId); + } + this.rescaleTimeoutId = window.setTimeout(() => { + if (this.desiredParallelism.size > 0) { + this.rescale.emit(this.desiredParallelism); + } + }, rescaleTimeout); + } } diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts new file mode 100644 index 00000000000..db7613b4f5c --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing'; +import { TestBed } from '@angular/core/testing'; + +import { JobResourceRequirements } from '@flink-runtime-web/interfaces/job-resource-requirements'; +import { ConfigService } from '@flink-runtime-web/services/config.service'; +import { JobService } from '@flink-runtime-web/services/job.service'; + +const clone = function (jobResourceRequirements: JobResourceRequirements): JobResourceRequirements { + return JSON.parse(JSON.stringify(jobResourceRequirements)); +}; + +describe('Job Service', () => { + let configService: ConfigService; + let jobService: JobService; + let httpTestingController: HttpTestingController; + + beforeEach(() => { + TestBed.configureTestingModule({ + imports: [HttpClientTestingModule], + providers: [{ provide: ConfigService, useValue: new ConfigService() }] + }); + + configService = TestBed.inject(ConfigService); + jobService = TestBed.inject(JobService); + httpTestingController = TestBed.inject(HttpTestingController); + }); + + it('#changeDesiredParallelism', done => { + const jobId = 'apache-flink'; + const jobResourceRequirements: JobResourceRequirements = { + firstVertex: { + parallelism: { + lowerBound: 1, + upperBound: 1 + } + }, + secondVertex: { + parallelism: { + lowerBound: 1, + upperBound: 2 + } + }, + thirdVertex: { + parallelism: { + lowerBound: 1, + upperBound: 3 + } + } + }; + const desiredParallelism = new Map<string, number>(); + desiredParallelism.set('firstVertex', 3); + desiredParallelism.set('secondVertex', 2); + desiredParallelism.set('thirdVertex', 1); + jobService.changeDesiredParallelism(jobId, desiredParallelism).subscribe(() => { + expect(true).toBe(true); + done(); + }); + + const firstRequest = httpTestingController.expectOne({ + method: 'GET', + url: `${configService.BASE_URL}/jobs/${jobId}/resource-requirements` + }); + firstRequest.flush(clone(jobResourceRequirements)); + + const expected = clone(jobResourceRequirements); + for (const k of desiredParallelism.keys()) { + const newUpperBound = desiredParallelism.get(k); + if (newUpperBound != undefined) { + expected[k].parallelism.upperBound = newUpperBound; + } + } + + const secondRequest = httpTestingController.expectOne(request => { + let matches = true; + matches = matches && request.method == 'PUT'; + matches = matches && request.url == `${configService.BASE_URL}/jobs/${jobId}/resource-requirements`; + if (matches) { + expect(request.body).toEqual(expected); + } + return true; + }); + secondRequest.flush(expected); + }); +}); diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts index 21eb73bc3f4..ef8c221b323 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts @@ -18,7 +18,7 @@ import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; -import { EMPTY, forkJoin, Observable } from 'rxjs'; +import { EMPTY, forkJoin, mergeMap, Observable } from 'rxjs'; import { catchError, map } from 'rxjs/operators'; import { @@ -44,6 +44,7 @@ import { VerticesLink, JobVertexSubTaskDetail } from '@flink-runtime-web/interfaces'; +import { JobResourceRequirements } from '@flink-runtime-web/interfaces/job-resource-requirements'; import { ConfigService } from './config.service'; @@ -175,6 +176,35 @@ export class JobService { ); } + public loadJobResourceRequirements(jobId: string): Observable<JobResourceRequirements> { + return this.httpClient.get<JobResourceRequirements>( + `${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements` + ); + } + + public changeDesiredParallelism(jobId: string, desiredParallelism: Map<string, number>): Observable<void> { + return this.loadJobResourceRequirements(jobId) + .pipe( + map(jobResourceRequirements => { + for (const vertexId in jobResourceRequirements) { + const newUpperBound = desiredParallelism.get(vertexId); + if (newUpperBound != undefined) { + jobResourceRequirements[vertexId].parallelism.upperBound = newUpperBound; + } + } + return jobResourceRequirements; + }) + ) + .pipe( + mergeMap(jobResourceRequirements => { + return this.httpClient.put<void>( + `${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`, + jobResourceRequirements + ); + }) + ); + } + /** nodes to nodes links in order to generate graph */ private convertJob(job: JobDetail): JobDetailCorrect { const links: VerticesLink[] = []; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java index c163d802008..9c8de50b21c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.rest.handler; import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; import org.apache.flink.util.Preconditions; @@ -40,13 +41,16 @@ public class RestHandlerConfiguration { private final boolean webCancelEnabled; + private final boolean webRescaleEnabled; + public RestHandlerConfiguration( long refreshInterval, int maxCheckpointStatisticCacheEntries, Time timeout, File webUiDir, boolean webSubmitEnabled, - boolean webCancelEnabled) { + boolean webCancelEnabled, + boolean webRescaleEnabled) { Preconditions.checkArgument( refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); this.refreshInterval = refreshInterval; @@ -57,6 +61,7 @@ public class RestHandlerConfiguration { this.webUiDir = Preconditions.checkNotNull(webUiDir); this.webSubmitEnabled = webSubmitEnabled; this.webCancelEnabled = webCancelEnabled; + this.webRescaleEnabled = webRescaleEnabled; } public long getRefreshInterval() { @@ -83,6 +88,10 @@ public class RestHandlerConfiguration { return webCancelEnabled; } + public boolean isWebRescaleEnabled() { + return webRescaleEnabled; + } + public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); @@ -96,6 +105,11 @@ public class RestHandlerConfiguration { final boolean webSubmitEnabled = configuration.getBoolean(WebOptions.SUBMIT_ENABLE); final boolean webCancelEnabled = configuration.getBoolean(WebOptions.CANCEL_ENABLE); + final boolean webRescaleSupported = + ClusterOptions.isAdaptiveSchedulerEnabled(configuration) + && !ClusterOptions.isReactiveModeEnabled(configuration); + final boolean webRescaleEnabled = + webRescaleSupported && configuration.getBoolean(WebOptions.RESCALE_ENABLE); return new RestHandlerConfiguration( refreshInterval, @@ -103,6 +117,7 @@ public class RestHandlerConfiguration { timeout, webUiDir, webSubmitEnabled, - webCancelEnabled); + webCancelEnabled, + webRescaleEnabled); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java index caae3c14f92..d456ea07811 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java @@ -49,7 +49,8 @@ public class DashboardConfigHandler messageHeaders, long refreshInterval, boolean webSubmitEnabled, - boolean webCancelEnabled) { + boolean webCancelEnabled, + boolean webRescaleEnabled) { super(leaderRetriever, timeout, responseHeaders, messageHeaders); dashboardConfiguration = @@ -58,6 +59,7 @@ public class DashboardConfigHandler ZonedDateTime.now(), webSubmitEnabled, webCancelEnabled, + webRescaleEnabled, false); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java index b3cfa4fb47d..6fa883d66a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java @@ -48,6 +48,8 @@ public class DashboardConfiguration implements ResponseBody { public static final String FIELD_NAME_FEATURE_WEB_CANCEL = "web-cancel"; + public static final String FIELD_NAME_FEATURE_WEB_RESCALE = "web-rescale"; + public static final String FIELD_NAME_FEATURE_WEB_HISTORY = "web-history"; @JsonProperty(FIELD_NAME_REFRESH_INTERVAL) @@ -123,6 +125,9 @@ public class DashboardConfiguration implements ResponseBody { @JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) private final boolean webCancelEnabled; + @JsonProperty(FIELD_NAME_FEATURE_WEB_RESCALE) + private final boolean webRescaleEnabled; + @JsonProperty(FIELD_NAME_FEATURE_WEB_HISTORY) private final boolean isHistoryServer; @@ -130,9 +135,11 @@ public class DashboardConfiguration implements ResponseBody { public Features( @JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean webSubmitEnabled, @JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) boolean webCancelEnabled, + @JsonProperty(FIELD_NAME_FEATURE_WEB_RESCALE) boolean webRescaleEnabled, @JsonProperty(FIELD_NAME_FEATURE_WEB_HISTORY) boolean isHistoryServer) { this.webSubmitEnabled = webSubmitEnabled; this.webCancelEnabled = webCancelEnabled; + this.webRescaleEnabled = webRescaleEnabled; this.isHistoryServer = isHistoryServer; } @@ -146,6 +153,11 @@ public class DashboardConfiguration implements ResponseBody { return webCancelEnabled; } + @JsonIgnore + public boolean isWebRescaleEnabled() { + return webRescaleEnabled; + } + @JsonIgnore public boolean isHistoryServer() { return isHistoryServer; @@ -204,6 +216,7 @@ public class DashboardConfiguration implements ResponseBody { ZonedDateTime zonedDateTime, boolean webSubmitEnabled, boolean webCancelEnabled, + boolean webRescaleEnabled, boolean isHistoryServer) { final String flinkVersion = EnvironmentInformation.getVersion(); @@ -226,6 +239,7 @@ public class DashboardConfiguration implements ResponseBody { zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000, flinkVersion, flinkRevision, - new Features(webSubmitEnabled, webCancelEnabled, isHistoryServer)); + new Features( + webSubmitEnabled, webCancelEnabled, webRescaleEnabled, isHistoryServer)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 7836d720237..5193e08d85e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -298,7 +298,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp DashboardConfigurationHeaders.getInstance(), restConfiguration.getRefreshInterval(), hasWebSubmissionHandlers, - restConfiguration.isWebCancelEnabled()); + restConfiguration.isWebCancelEnabled(), + restConfiguration.isWebRescaleEnabled()); JobIdsHandler jobIdsHandler = new JobIdsHandler( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java index 43126e5a0d2..67454e9b1ae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java @@ -19,43 +19,78 @@ package org.apache.flink.runtime.rest.handler; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.configuration.WebOptions; -import org.apache.flink.util.TestLogger; +import org.apache.flink.util.TestLoggerExtension; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link RestHandlerConfiguration}. */ -public class RestHandlerConfigurationTest extends TestLogger { +@ExtendWith(TestLoggerExtension.class) +class RestHandlerConfigurationTest { @Test - public void testWebSubmitFeatureFlagEnabled() { + void testWebSubmitFeatureFlagEnabled() { testWebSubmitFeatureFlag(true); } @Test - public void testWebSubmitFeatureFlagDisabled() { + void testWebSubmitFeatureFlagDisabled() { testWebSubmitFeatureFlag(false); } @Test - public void testWebCancelFeatureFlagEnabled() { + void testWebCancelFeatureFlagEnabled() { testWebCancelFeatureFlag(true); } @Test - public void testWebCancelFeatureFlagDisabled() { + void testWebCancelFeatureFlagDisabled() { testWebCancelFeatureFlag(false); } + @ParameterizedTest + @CsvSource({ + "true,true,true,false", + "true,true,false,true", + "true,false,true,false", + "true,false,false,false", + "false,true,true,false", + "false,true,false,false", + "false,false,true,false", + "false,false,false,false", + }) + void testWebRescaleFeatureFlagWithReactiveMode( + boolean webRescaleEnabled, + boolean adaptiveScheduler, + boolean reactiveMode, + boolean expectedResult) { + final Configuration config = new Configuration(); + config.setBoolean(WebOptions.RESCALE_ENABLE, webRescaleEnabled); + if (adaptiveScheduler) { + config.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + } + if (reactiveMode) { + config.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE); + } + RestHandlerConfiguration restHandlerConfiguration = + RestHandlerConfiguration.fromConfiguration(config); + assertThat(restHandlerConfiguration.isWebRescaleEnabled()).isEqualTo(expectedResult); + } + private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) { final Configuration config = new Configuration(); config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled); RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(config); - assertEquals(webSubmitEnabled, restHandlerConfiguration.isWebSubmitEnabled()); + assertThat(restHandlerConfiguration.isWebSubmitEnabled()).isEqualTo(webSubmitEnabled); } private static void testWebCancelFeatureFlag(boolean webCancelEnabled) { @@ -64,6 +99,6 @@ public class RestHandlerConfigurationTest extends TestLogger { RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(config); - assertEquals(webCancelEnabled, restHandlerConfiguration.isWebCancelEnabled()); + assertThat(restHandlerConfiguration.isWebCancelEnabled()).isEqualTo(webCancelEnabled); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java index aec97d61c72..eeedeb5b749 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java @@ -35,6 +35,6 @@ public class DashboardConfigurationTest 42, "version", "revision", - new DashboardConfiguration.Features(true, true, false)); + new DashboardConfiguration.Features(true, true, true, false)); } }