This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d2cfbc11f [FLINK-31471] Allow setting JobResourceRequirements 
through WEB UI.
40d2cfbc11f is described below

commit 40d2cfbc11f4c3598dcba5cbc7367237a9ddbf2f
Author: David Moravek <d...@apache.org>
AuthorDate: Tue Apr 4 14:49:58 2023 +0200

    [FLINK-31471] Allow setting JobResourceRequirements through WEB UI.
---
 .../shortcodes/generated/rest_v1_dispatcher.html   |   3 +
 .../shortcodes/generated/web_configuration.html    |   6 ++
 docs/static/generated/rest_v1_dispatcher.yml       |   2 +
 .../apache/flink/configuration/ClusterOptions.java |   2 +-
 .../org/apache/flink/configuration/WebOptions.java |  10 +-
 .../runtime/webmonitor/history/HistoryServer.java  |   1 +
 .../src/test/resources/rest_api_v1.snapshot        |   3 +
 .../src/app/interfaces/configuration.ts            |   1 +
 ...nfiguration.ts => job-resource-requirements.ts} |  30 ++----
 .../pages/job/overview/job-overview.component.html |   1 +
 .../pages/job/overview/job-overview.component.ts   |  14 ++-
 .../overview/list/job-overview-list.component.html |  37 +++++++-
 .../overview/list/job-overview-list.component.ts   |  61 +++++++++++-
 .../src/app/services/job.service.spec.ts           | 102 +++++++++++++++++++++
 .../web-dashboard/src/app/services/job.service.ts  |  32 ++++++-
 .../rest/handler/RestHandlerConfiguration.java     |  19 +++-
 .../handler/cluster/DashboardConfigHandler.java    |   4 +-
 .../rest/messages/DashboardConfiguration.java      |  16 +++-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   3 +-
 .../rest/handler/RestHandlerConfigurationTest.java |  55 +++++++++--
 .../rest/messages/DashboardConfigurationTest.java  |   2 +-
 21 files changed, 353 insertions(+), 51 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 4d446c68a06..ae11c7dc4f1 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -70,6 +70,9 @@
         "web-history" : {
           "type" : "boolean"
         },
+        "web-rescale" : {
+          "type" : "boolean"
+        },
         "web-submit" : {
           "type" : "boolean"
         }
diff --git a/docs/layouts/shortcodes/generated/web_configuration.html 
b/docs/layouts/shortcodes/generated/web_configuration.html
index 693580ba83b..7ac55f5685c 100644
--- a/docs/layouts/shortcodes/generated/web_configuration.html
+++ b/docs/layouts/shortcodes/generated/web_configuration.html
@@ -50,6 +50,12 @@
             <td>Long</td>
             <td>Refresh interval for the web-frontend in milliseconds.</td>
         </tr>
+        <tr>
+            <td><h5>web.rescale.enable</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Flag indicating whether jobs can be rescaled from the 
web-frontend.</td>
+        </tr>
         <tr>
             <td><h5>web.submit.enable</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 2e8d49145a2..0135b51469a 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -2090,6 +2090,8 @@ components:
           type: boolean
         web-history:
           type: boolean
+        web-rescale:
+          type: boolean
         web-submit:
           type: boolean
     GarbageCollectorInfo:
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
index 2785f598dbb..92678f82465 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
@@ -213,7 +213,7 @@ public class ClusterOptions {
         }
     }
 
-    private static boolean isReactiveModeEnabled(Configuration configuration) {
+    public static boolean isReactiveModeEnabled(Configuration configuration) {
         return configuration.get(JobManagerOptions.SCHEDULER_MODE)
                 == SchedulerExecutionMode.REACTIVE;
     }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
index ebc9ac4e85b..e40adcbaaaa 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java
@@ -133,7 +133,7 @@ public class WebOptions {
                     .withDescription(
                             "Flag indicating whether jobs can be uploaded and 
run from the web-frontend.");
 
-    /** Config parameter indicating whether jobs can be cancel from the 
web-frontend. */
+    /** Config parameter indicating whether jobs can be canceled from the 
web-frontend. */
     public static final ConfigOption<Boolean> CANCEL_ENABLE =
             key("web.cancel.enable")
                     .booleanType()
@@ -141,6 +141,14 @@ public class WebOptions {
                     .withDescription(
                             "Flag indicating whether jobs can be canceled from 
the web-frontend.");
 
+    /** Config parameter indicating whether jobs can be rescaled from the 
web-frontend. */
+    public static final ConfigOption<Boolean> RESCALE_ENABLE =
+            key("web.rescale.enable")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Flag indicating whether jobs can be rescaled from 
the web-frontend.");
+
     /** Config parameter defining the number of checkpoints to remember for 
recent history. */
     public static final ConfigOption<Integer> CHECKPOINTS_HISTORY_SIZE =
             key("web.checkpoints.history")
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index fff6d42306f..0f170d9d79a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -379,6 +379,7 @@ public class HistoryServer {
                                     ZonedDateTime.now(),
                                     false,
                                     false,
+                                    false,
                                     true)));
             fw.flush();
         } catch (IOException ioe) {
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 04f092a2f8b..20d01ddea8a 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -62,6 +62,9 @@
             "web-cancel" : {
               "type" : "boolean"
             },
+            "web-rescale" : {
+              "type" : "boolean"
+            },
             "web-history" : {
               "type" : "boolean"
             }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
index dac450e3c1c..a500a729a15 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
+++ b/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
@@ -42,5 +42,6 @@ export interface Configuration {
     'web-history': boolean;
     'web-submit': boolean;
     'web-cancel': boolean;
+    'web-rescale': boolean;
   };
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts
similarity index 61%
copy from flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
copy to 
flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts
index dac450e3c1c..104cf81e4e4 100644
--- a/flink-runtime-web/web-dashboard/src/app/interfaces/configuration.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/interfaces/job-resource-requirements.ts
@@ -16,31 +16,13 @@
  * limitations under the License.
  */
 
-export interface JvmInfo {
-  version: string;
-  arch: string;
-  options: string[];
+export interface JobResourceRequirements {
+  [key: string]: JobVertexResourceRequirements;
 }
 
-export interface EnvironmentInfo {
-  jvm: JvmInfo;
-  classpath: string[];
-}
-
-export interface ClusterConfiguration {
-  key: string;
-  value: string;
-}
-
-export interface Configuration {
-  'refresh-interval': number;
-  'timezone-name': string;
-  'timezone-offset': number;
-  'flink-version': string;
-  'flink-revision': string;
-  features: {
-    'web-history': boolean;
-    'web-submit': boolean;
-    'web-cancel': boolean;
+export interface JobVertexResourceRequirements {
+  parallelism: {
+    lowerBound: number;
+    upperBound: number;
   };
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
index 533540317d9..53e5e487a08 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.html
@@ -30,6 +30,7 @@
 <ng-container *ngIf="nodes.length > 0">
   <flink-job-overview-list
     (nodeClick)="onNodeClick($event)"
+    (rescale)="onRescale($event)"
     [nodes]="nodes"
     [selectedNode]="selectedNode"
   ></flink-job-overview-list>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
index 28e5d15d879..10f7c034b40 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/job-overview.component.ts
@@ -34,8 +34,9 @@ import { DagreComponent } from 
'@flink-runtime-web/components/dagre/dagre.compon
 import { ResizeComponent } from 
'@flink-runtime-web/components/resize/resize.component';
 import { NodesItemCorrect, NodesItemLink } from 
'@flink-runtime-web/interfaces';
 import { JobOverviewListComponent } from 
'@flink-runtime-web/pages/job/overview/list/job-overview-list.component';
-import { MetricsService } from '@flink-runtime-web/services';
+import { JobService, MetricsService } from '@flink-runtime-web/services';
 import { NzAlertModule } from 'ng-zorro-antd/alert';
+import { NzNotificationService } from 'ng-zorro-antd/notification';
 
 import { JobLocalService } from '../job-local.service';
 
@@ -65,6 +66,8 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
     public readonly elementRef: ElementRef,
     private readonly metricService: MetricsService,
     private readonly jobLocalService: JobLocalService,
+    private readonly jobService: JobService,
+    private readonly notificationService: NzNotificationService,
     private readonly cdr: ChangeDetectorRef
   ) {}
 
@@ -115,6 +118,15 @@ export class JobOverviewComponent implements OnInit, 
OnDestroy {
     }
   }
 
+  public onRescale(desiredParallelism: Map<string, number>): void {
+    this.jobService.changeDesiredParallelism(this.jobId, 
desiredParallelism).subscribe(() => {
+      this.notificationService.success(
+        'Rescaling operation.',
+        'Job resources requirements have been updated. Job will now try to 
rescale.'
+      );
+    });
+  }
+
   public onResizeEnd(): void {
     if (!this.selectedNode) {
       this.dagreComponent.moveToCenter();
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
index 873c7c53231..c6a4f9ae92c 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.html
@@ -33,11 +33,12 @@
       <th [nzSortFn]="sortReadRecordsFn" nzWidth="150px">Records Received</th>
       <th [nzSortFn]="sortWriteBytesFn" nzWidth="150px">Bytes Sent</th>
       <th [nzSortFn]="sortWriteRecordsFn" nzWidth="120px">Records Sent</th>
-      <th [nzSortFn]="sortParallelismFn" nzWidth="120px">Parallelism</th>
+      <th [nzSortFn]="sortParallelismFn" nzWidth="100px">Parallelism</th>
       <th [nzSortFn]="sortStartTimeFn" nzWidth="150px">Start Time</th>
       <th [nzSortFn]="sortDurationFn" nzWidth="150px">Duration</th>
       <th [nzSortFn]="sortEndTimeFn" nzWidth="150px">End Time</th>
-      <th nzWidth="100px" nzRight>Tasks</th>
+      <th nzWidth="60px" nzRight>Tasks</th>
+      <th *ngIf="webRescaleEnabled" nzWidth="80px" nzRight>Scale</th>
     </tr>
   </thead>
   <tbody>
@@ -95,13 +96,43 @@
           {{ node.detail.metrics['write-records'] | number: '1.0-0' }}
         </span>
       </td>
-      <td>{{ node.parallelism }}</td>
+      <td>
+        <span>
+          {{ node.parallelism }}
+        </span>
+        <span *ngIf="desiredParallelism.has(node.id)">
+          <b>
+            <i nz-icon nzType="sync"></i>
+            {{ desiredParallelism.get(node.id) }}
+          </b>
+        </span>
+      </td>
       <td>{{ node.detail['start-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' 
}}</td>
       <td>{{ node.detail?.duration | humanizeDuration }}</td>
       <td>{{ node.detail['end-time'] | humanizeDate: 'yyyy-MM-dd HH:mm:ss' 
}}</td>
       <td nzRight [class.selected]="selectedNode?.id === node.id">
         <flink-task-badge [tasks]="node.detail?.tasks"></flink-task-badge>
       </td>
+      <td *ngIf="webRescaleEnabled" nzRight>
+        <nz-button-group>
+          <button
+            nz-button
+            nzSize="small"
+            nzType="default"
+            (click)="clickScaleUp(node); $event.stopPropagation()"
+          >
+            <span nz-icon nzType="plus"></span>
+          </button>
+          <button
+            nz-button
+            nzSize="small"
+            nzType="default"
+            (click)="clickScaleDown(node); $event.stopPropagation()"
+          >
+            <span nz-icon nzType="minus"></span>
+          </button>
+        </nz-button-group>
+      </td>
     </tr>
   </tbody>
 </nz-table>
diff --git 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
index 498f10a852d..ad632360118 100644
--- 
a/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
+++ 
b/flink-runtime-web/web-dashboard/src/app/pages/job/overview/list/job-overview-list.component.ts
@@ -26,6 +26,10 @@ import { JobBadgeComponent } from 
'@flink-runtime-web/components/job-badge/job-b
 import { ResizeComponent } from 
'@flink-runtime-web/components/resize/resize.component';
 import { TaskBadgeComponent } from 
'@flink-runtime-web/components/task-badge/task-badge.component';
 import { NodesItemCorrect } from '@flink-runtime-web/interfaces';
+import { StatusService } from '@flink-runtime-web/services';
+import { NzBadgeModule } from 'ng-zorro-antd/badge';
+import { NzButtonModule } from 'ng-zorro-antd/button';
+import { NzIconModule } from 'ng-zorro-antd/icon';
 import { NzTableModule } from 'ng-zorro-antd/table';
 import { NzTableSortFn } from 'ng-zorro-antd/table/src/table.types';
 import { NzToolTipModule } from 'ng-zorro-antd/tooltip';
@@ -36,6 +40,8 @@ function createSortFn(
   return (pre, next) => (selector(pre)! > selector(next)! ? 1 : -1);
 }
 
+const rescaleTimeout = 2500;
+
 @Component({
   selector: 'flink-job-overview-list',
   templateUrl: './job-overview-list.component.html',
@@ -52,12 +58,16 @@ function createSortFn(
     HumanizeDatePipe,
     HumanizeDurationPipe,
     TaskBadgeComponent,
-    ResizeComponent
+    ResizeComponent,
+    NzButtonModule,
+    NzIconModule,
+    NzBadgeModule
   ],
   standalone: true
 })
 export class JobOverviewListComponent {
   public readonly trackById = (_: number, node: NodesItemCorrect): string => 
node.id;
+  public readonly webRescaleEnabled = 
this.statusService.configuration.features['web-rescale'];
 
   public readonly sortStatusFn = createSortFn(item => item.detail?.status);
   public readonly sortReadBytesFn = createSortFn(item => 
item.detail?.metrics?.['read-bytes']);
@@ -70,26 +80,69 @@ export class JobOverviewListComponent {
   public readonly sortEndTimeFn = createSortFn(item => 
item.detail?.['end-time']);
 
   public innerNodes: NodesItemCorrect[] = [];
-  public sortName: string;
-  public sortValue: string;
   public left = 390;
 
+  public desiredParallelism = new Map<string, number>();
+
+  public rescaleTimeoutId: number | undefined;
+
   @Output() public readonly nodeClick = new EventEmitter<NodesItemCorrect>();
 
+  @Output() public readonly rescale = new EventEmitter<Map<string, number>>();
+
   @Input() public selectedNode: NodesItemCorrect;
 
   @Input()
   public set nodes(value: NodesItemCorrect[]) {
     this.innerNodes = value;
+    for (const node of value) {
+      if (node.parallelism == this.desiredParallelism.get(node.id)) {
+        this.desiredParallelism.delete(node.id);
+      }
+    }
   }
 
   public get nodes(): NodesItemCorrect[] {
     return this.innerNodes;
   }
 
-  constructor(public readonly elementRef: ElementRef) {}
+  constructor(public readonly elementRef: ElementRef, private readonly 
statusService: StatusService) {}
 
   public clickNode(node: NodesItemCorrect): void {
     this.nodeClick.emit(node);
   }
+
+  public clickScaleUp(node: NodesItemCorrect): void {
+    let currentDesiredParallelism = this.desiredParallelism.get(node.id);
+    if (currentDesiredParallelism == undefined) {
+      currentDesiredParallelism = node.parallelism;
+    }
+    const newDesiredParallelism = currentDesiredParallelism + 1;
+    this.changeDesiredParallelism(node, newDesiredParallelism);
+  }
+
+  public clickScaleDown(node: NodesItemCorrect): void {
+    let currentDesiredParallelism = this.desiredParallelism.get(node.id);
+    if (currentDesiredParallelism == undefined) {
+      currentDesiredParallelism = node.parallelism;
+    }
+    const newDesiredParallelism = Math.max(1, currentDesiredParallelism - 1);
+    this.changeDesiredParallelism(node, newDesiredParallelism);
+  }
+
+  private changeDesiredParallelism(node: NodesItemCorrect, 
newDesiredParallelism: number): void {
+    if (newDesiredParallelism == node.parallelism) {
+      this.desiredParallelism.delete(node.id);
+    } else {
+      this.desiredParallelism.set(node.id, newDesiredParallelism);
+    }
+    if (this.rescaleTimeoutId != undefined) {
+      window.clearTimeout(this.rescaleTimeoutId);
+    }
+    this.rescaleTimeoutId = window.setTimeout(() => {
+      if (this.desiredParallelism.size > 0) {
+        this.rescale.emit(this.desiredParallelism);
+      }
+    }, rescaleTimeout);
+  }
 }
diff --git 
a/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts 
b/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts
new file mode 100644
index 00000000000..db7613b4f5c
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.spec.ts
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { HttpClientTestingModule, HttpTestingController } from 
'@angular/common/http/testing';
+import { TestBed } from '@angular/core/testing';
+
+import { JobResourceRequirements } from 
'@flink-runtime-web/interfaces/job-resource-requirements';
+import { ConfigService } from '@flink-runtime-web/services/config.service';
+import { JobService } from '@flink-runtime-web/services/job.service';
+
+const clone = function (jobResourceRequirements: JobResourceRequirements): 
JobResourceRequirements {
+  return JSON.parse(JSON.stringify(jobResourceRequirements));
+};
+
+describe('Job Service', () => {
+  let configService: ConfigService;
+  let jobService: JobService;
+  let httpTestingController: HttpTestingController;
+
+  beforeEach(() => {
+    TestBed.configureTestingModule({
+      imports: [HttpClientTestingModule],
+      providers: [{ provide: ConfigService, useValue: new ConfigService() }]
+    });
+
+    configService = TestBed.inject(ConfigService);
+    jobService = TestBed.inject(JobService);
+    httpTestingController = TestBed.inject(HttpTestingController);
+  });
+
+  it('#changeDesiredParallelism', done => {
+    const jobId = 'apache-flink';
+    const jobResourceRequirements: JobResourceRequirements = {
+      firstVertex: {
+        parallelism: {
+          lowerBound: 1,
+          upperBound: 1
+        }
+      },
+      secondVertex: {
+        parallelism: {
+          lowerBound: 1,
+          upperBound: 2
+        }
+      },
+      thirdVertex: {
+        parallelism: {
+          lowerBound: 1,
+          upperBound: 3
+        }
+      }
+    };
+    const desiredParallelism = new Map<string, number>();
+    desiredParallelism.set('firstVertex', 3);
+    desiredParallelism.set('secondVertex', 2);
+    desiredParallelism.set('thirdVertex', 1);
+    jobService.changeDesiredParallelism(jobId, 
desiredParallelism).subscribe(() => {
+      expect(true).toBe(true);
+      done();
+    });
+
+    const firstRequest = httpTestingController.expectOne({
+      method: 'GET',
+      url: `${configService.BASE_URL}/jobs/${jobId}/resource-requirements`
+    });
+    firstRequest.flush(clone(jobResourceRequirements));
+
+    const expected = clone(jobResourceRequirements);
+    for (const k of desiredParallelism.keys()) {
+      const newUpperBound = desiredParallelism.get(k);
+      if (newUpperBound != undefined) {
+        expected[k].parallelism.upperBound = newUpperBound;
+      }
+    }
+
+    const secondRequest = httpTestingController.expectOne(request => {
+      let matches = true;
+      matches = matches && request.method == 'PUT';
+      matches = matches && request.url == 
`${configService.BASE_URL}/jobs/${jobId}/resource-requirements`;
+      if (matches) {
+        expect(request.body).toEqual(expected);
+      }
+      return true;
+    });
+    secondRequest.flush(expected);
+  });
+});
diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts 
b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
index 21eb73bc3f4..ef8c221b323 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts
@@ -18,7 +18,7 @@
 
 import { HttpClient } from '@angular/common/http';
 import { Injectable } from '@angular/core';
-import { EMPTY, forkJoin, Observable } from 'rxjs';
+import { EMPTY, forkJoin, mergeMap, Observable } from 'rxjs';
 import { catchError, map } from 'rxjs/operators';
 
 import {
@@ -44,6 +44,7 @@ import {
   VerticesLink,
   JobVertexSubTaskDetail
 } from '@flink-runtime-web/interfaces';
+import { JobResourceRequirements } from 
'@flink-runtime-web/interfaces/job-resource-requirements';
 
 import { ConfigService } from './config.service';
 
@@ -175,6 +176,35 @@ export class JobService {
     );
   }
 
+  public loadJobResourceRequirements(jobId: string): 
Observable<JobResourceRequirements> {
+    return this.httpClient.get<JobResourceRequirements>(
+      `${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`
+    );
+  }
+
+  public changeDesiredParallelism(jobId: string, desiredParallelism: 
Map<string, number>): Observable<void> {
+    return this.loadJobResourceRequirements(jobId)
+      .pipe(
+        map(jobResourceRequirements => {
+          for (const vertexId in jobResourceRequirements) {
+            const newUpperBound = desiredParallelism.get(vertexId);
+            if (newUpperBound != undefined) {
+              jobResourceRequirements[vertexId].parallelism.upperBound = 
newUpperBound;
+            }
+          }
+          return jobResourceRequirements;
+        })
+      )
+      .pipe(
+        mergeMap(jobResourceRequirements => {
+          return this.httpClient.put<void>(
+            
`${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`,
+            jobResourceRequirements
+          );
+        })
+      );
+  }
+
   /** nodes to nodes links in order to generate graph */
   private convertJob(job: JobDetail): JobDetailCorrect {
     const links: VerticesLink[] = [];
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index c163d802008..9c8de50b21c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
@@ -40,13 +41,16 @@ public class RestHandlerConfiguration {
 
     private final boolean webCancelEnabled;
 
+    private final boolean webRescaleEnabled;
+
     public RestHandlerConfiguration(
             long refreshInterval,
             int maxCheckpointStatisticCacheEntries,
             Time timeout,
             File webUiDir,
             boolean webSubmitEnabled,
-            boolean webCancelEnabled) {
+            boolean webCancelEnabled,
+            boolean webRescaleEnabled) {
         Preconditions.checkArgument(
                 refreshInterval > 0L, "The refresh interval (ms) should be 
larger than 0.");
         this.refreshInterval = refreshInterval;
@@ -57,6 +61,7 @@ public class RestHandlerConfiguration {
         this.webUiDir = Preconditions.checkNotNull(webUiDir);
         this.webSubmitEnabled = webSubmitEnabled;
         this.webCancelEnabled = webCancelEnabled;
+        this.webRescaleEnabled = webRescaleEnabled;
     }
 
     public long getRefreshInterval() {
@@ -83,6 +88,10 @@ public class RestHandlerConfiguration {
         return webCancelEnabled;
     }
 
+    public boolean isWebRescaleEnabled() {
+        return webRescaleEnabled;
+    }
+
     public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
         final long refreshInterval = 
configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
@@ -96,6 +105,11 @@ public class RestHandlerConfiguration {
 
         final boolean webSubmitEnabled = 
configuration.getBoolean(WebOptions.SUBMIT_ENABLE);
         final boolean webCancelEnabled = 
configuration.getBoolean(WebOptions.CANCEL_ENABLE);
+        final boolean webRescaleSupported =
+                ClusterOptions.isAdaptiveSchedulerEnabled(configuration)
+                        && 
!ClusterOptions.isReactiveModeEnabled(configuration);
+        final boolean webRescaleEnabled =
+                webRescaleSupported && 
configuration.getBoolean(WebOptions.RESCALE_ENABLE);
 
         return new RestHandlerConfiguration(
                 refreshInterval,
@@ -103,6 +117,7 @@ public class RestHandlerConfiguration {
                 timeout,
                 webUiDir,
                 webSubmitEnabled,
-                webCancelEnabled);
+                webCancelEnabled,
+                webRescaleEnabled);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
index caae3c14f92..d456ea07811 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/cluster/DashboardConfigHandler.java
@@ -49,7 +49,8 @@ public class DashboardConfigHandler
                     messageHeaders,
             long refreshInterval,
             boolean webSubmitEnabled,
-            boolean webCancelEnabled) {
+            boolean webCancelEnabled,
+            boolean webRescaleEnabled) {
         super(leaderRetriever, timeout, responseHeaders, messageHeaders);
 
         dashboardConfiguration =
@@ -58,6 +59,7 @@ public class DashboardConfigHandler
                         ZonedDateTime.now(),
                         webSubmitEnabled,
                         webCancelEnabled,
+                        webRescaleEnabled,
                         false);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
index b3cfa4fb47d..6fa883d66a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfiguration.java
@@ -48,6 +48,8 @@ public class DashboardConfiguration implements ResponseBody {
 
     public static final String FIELD_NAME_FEATURE_WEB_CANCEL = "web-cancel";
 
+    public static final String FIELD_NAME_FEATURE_WEB_RESCALE = "web-rescale";
+
     public static final String FIELD_NAME_FEATURE_WEB_HISTORY = "web-history";
 
     @JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
@@ -123,6 +125,9 @@ public class DashboardConfiguration implements ResponseBody 
{
         @JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL)
         private final boolean webCancelEnabled;
 
+        @JsonProperty(FIELD_NAME_FEATURE_WEB_RESCALE)
+        private final boolean webRescaleEnabled;
+
         @JsonProperty(FIELD_NAME_FEATURE_WEB_HISTORY)
         private final boolean isHistoryServer;
 
@@ -130,9 +135,11 @@ public class DashboardConfiguration implements 
ResponseBody {
         public Features(
                 @JsonProperty(FIELD_NAME_FEATURE_WEB_SUBMIT) boolean 
webSubmitEnabled,
                 @JsonProperty(FIELD_NAME_FEATURE_WEB_CANCEL) boolean 
webCancelEnabled,
+                @JsonProperty(FIELD_NAME_FEATURE_WEB_RESCALE) boolean 
webRescaleEnabled,
                 @JsonProperty(FIELD_NAME_FEATURE_WEB_HISTORY) boolean 
isHistoryServer) {
             this.webSubmitEnabled = webSubmitEnabled;
             this.webCancelEnabled = webCancelEnabled;
+            this.webRescaleEnabled = webRescaleEnabled;
             this.isHistoryServer = isHistoryServer;
         }
 
@@ -146,6 +153,11 @@ public class DashboardConfiguration implements 
ResponseBody {
             return webCancelEnabled;
         }
 
+        @JsonIgnore
+        public boolean isWebRescaleEnabled() {
+            return webRescaleEnabled;
+        }
+
         @JsonIgnore
         public boolean isHistoryServer() {
             return isHistoryServer;
@@ -204,6 +216,7 @@ public class DashboardConfiguration implements ResponseBody 
{
             ZonedDateTime zonedDateTime,
             boolean webSubmitEnabled,
             boolean webCancelEnabled,
+            boolean webRescaleEnabled,
             boolean isHistoryServer) {
 
         final String flinkVersion = EnvironmentInformation.getVersion();
@@ -226,6 +239,7 @@ public class DashboardConfiguration implements ResponseBody 
{
                 zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() 
* 1000,
                 flinkVersion,
                 flinkRevision,
-                new Features(webSubmitEnabled, webCancelEnabled, 
isHistoryServer));
+                new Features(
+                        webSubmitEnabled, webCancelEnabled, webRescaleEnabled, 
isHistoryServer));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 7836d720237..5193e08d85e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -298,7 +298,8 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         DashboardConfigurationHeaders.getInstance(),
                         restConfiguration.getRefreshInterval(),
                         hasWebSubmissionHandlers,
-                        restConfiguration.isWebCancelEnabled());
+                        restConfiguration.isWebCancelEnabled(),
+                        restConfiguration.isWebRescaleEnabled());
 
         JobIdsHandler jobIdsHandler =
                 new JobIdsHandler(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
index 43126e5a0d2..67454e9b1ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/RestHandlerConfigurationTest.java
@@ -19,43 +19,78 @@
 package org.apache.flink.runtime.rest.handler;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.TestLoggerExtension;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 
-import static org.junit.Assert.assertEquals;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link RestHandlerConfiguration}. */
-public class RestHandlerConfigurationTest extends TestLogger {
+@ExtendWith(TestLoggerExtension.class)
+class RestHandlerConfigurationTest {
 
     @Test
-    public void testWebSubmitFeatureFlagEnabled() {
+    void testWebSubmitFeatureFlagEnabled() {
         testWebSubmitFeatureFlag(true);
     }
 
     @Test
-    public void testWebSubmitFeatureFlagDisabled() {
+    void testWebSubmitFeatureFlagDisabled() {
         testWebSubmitFeatureFlag(false);
     }
 
     @Test
-    public void testWebCancelFeatureFlagEnabled() {
+    void testWebCancelFeatureFlagEnabled() {
         testWebCancelFeatureFlag(true);
     }
 
     @Test
-    public void testWebCancelFeatureFlagDisabled() {
+    void testWebCancelFeatureFlagDisabled() {
         testWebCancelFeatureFlag(false);
     }
 
+    @ParameterizedTest
+    @CsvSource({
+        "true,true,true,false",
+        "true,true,false,true",
+        "true,false,true,false",
+        "true,false,false,false",
+        "false,true,true,false",
+        "false,true,false,false",
+        "false,false,true,false",
+        "false,false,false,false",
+    })
+    void testWebRescaleFeatureFlagWithReactiveMode(
+            boolean webRescaleEnabled,
+            boolean adaptiveScheduler,
+            boolean reactiveMode,
+            boolean expectedResult) {
+        final Configuration config = new Configuration();
+        config.setBoolean(WebOptions.RESCALE_ENABLE, webRescaleEnabled);
+        if (adaptiveScheduler) {
+            config.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        }
+        if (reactiveMode) {
+            config.set(JobManagerOptions.SCHEDULER_MODE, 
SchedulerExecutionMode.REACTIVE);
+        }
+        RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(config);
+        
assertThat(restHandlerConfiguration.isWebRescaleEnabled()).isEqualTo(expectedResult);
+    }
+
     private static void testWebSubmitFeatureFlag(boolean webSubmitEnabled) {
         final Configuration config = new Configuration();
         config.setBoolean(WebOptions.SUBMIT_ENABLE, webSubmitEnabled);
 
         RestHandlerConfiguration restHandlerConfiguration =
                 RestHandlerConfiguration.fromConfiguration(config);
-        assertEquals(webSubmitEnabled, 
restHandlerConfiguration.isWebSubmitEnabled());
+        
assertThat(restHandlerConfiguration.isWebSubmitEnabled()).isEqualTo(webSubmitEnabled);
     }
 
     private static void testWebCancelFeatureFlag(boolean webCancelEnabled) {
@@ -64,6 +99,6 @@ public class RestHandlerConfigurationTest extends TestLogger {
 
         RestHandlerConfiguration restHandlerConfiguration =
                 RestHandlerConfiguration.fromConfiguration(config);
-        assertEquals(webCancelEnabled, 
restHandlerConfiguration.isWebCancelEnabled());
+        
assertThat(restHandlerConfiguration.isWebCancelEnabled()).isEqualTo(webCancelEnabled);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
index aec97d61c72..eeedeb5b749 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
@@ -35,6 +35,6 @@ public class DashboardConfigurationTest
                 42,
                 "version",
                 "revision",
-                new DashboardConfiguration.Features(true, true, false));
+                new DashboardConfiguration.Features(true, true, true, false));
     }
 }


Reply via email to