This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e4280d4f168 [FLINK-38893][runtime/rest] Introduce the
/jobs/:jobid/rescales/overview endpoint in the REST API
e4280d4f168 is described below
commit e4280d4f1680b7526743c264b96927b2dc10001a
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Mar 3 00:16:23 2026 +0800
[FLINK-38893][runtime/rest] Introduce the /jobs/:jobid/rescales/overview
endpoint in the REST API
Co-authored-by: XComp <[email protected]>
Co-authored-by: och5351 <[email protected]>
Co-authored-by: mateczagany <[email protected]>
---
.../shortcodes/generated/rest_v1_dispatcher.html | 227 +++++++++++++++++++
docs/static/generated/rest_v1_dispatcher.yml | 49 ++++
.../src/test/resources/rest_api_v1.snapshot | 203 +++++++++++++++++
.../job/rescales/JobRescalesOverviewHandler.java | 116 ++++++++++
.../messages/job/rescales/JobRescalesOverview.java | 246 +++++++++++++++++++++
.../job/rescales/JobRescalesOverviewHeaders.java | 74 +++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 15 ++
.../rescales/JobRescalesOverviewHandlerTest.java | 164 ++++++++++++++
8 files changed, 1094 insertions(+)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index b0e0cf4d0a4..8d7e8b44357 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -4025,6 +4025,233 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
</tr>
</tbody>
</table>
+<table class="rest-api table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left"
colspan="2"><h5><strong>/jobs/:jobid/rescales/overview</strong></h5></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Return job rescales overview.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies
a job.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Request</summary>
+ <pre><code>{}</code></pre>
+ </label>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Response</summary>
+ <pre><code>{
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview",
+ "properties" : {
+ "latest" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:LatestRescales",
+ "properties" : {
+ "completed" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+ "properties" : {
+ "endTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "rescaleAttemptId" : {
+ "type" : "integer"
+ },
+ "rescaleUuid" : {
+ "type" : "string"
+ },
+ "resourceRequirementsUuid" : {
+ "type" : "string"
+ },
+ "schedulerStates" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+ "properties" : {
+ "durationInMillis" : {
+ "type" : "integer"
+ },
+ "enterTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "leaveTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "state" : {
+ "type" : "string"
+ },
+ "stringifiedException" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "slots" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+ "properties" : {
+ "acquiredResourceProfile" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+ },
+ "desiredSlots" : {
+ "type" : "integer"
+ },
+ "minimalRequiredSlots" : {
+ "type" : "integer"
+ },
+ "postRescaleSlots" : {
+ "type" : "integer"
+ },
+ "preRescaleSlots" : {
+ "type" : "integer"
+ },
+ "requestResourceProfile" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+ "properties" : {
+ "cpuCores" : {
+ "type" : "number"
+ },
+ "extendedResources" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "number"
+ }
+ },
+ "managedMemory" : {
+ "type" : "integer"
+ },
+ "networkMemory" : {
+ "type" : "integer"
+ },
+ "taskHeapMemory" : {
+ "type" : "integer"
+ },
+ "taskOffHeapMemory" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "slotSharingGroupId" : {
+ "type" : "any"
+ },
+ "slotSharingGroupName" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "startTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "terminalState" : {
+ "type" : "string",
+ "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+ },
+ "terminatedReason" : {
+ "type" : "string",
+ "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED",
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE",
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+ },
+ "triggerCause" : {
+ "type" : "string",
+ "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT",
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+ },
+ "vertices" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+ "properties" : {
+ "desiredParallelism" : {
+ "type" : "integer"
+ },
+ "jobVertexId" : {
+ "type" : "any"
+ },
+ "jobVertexName" : {
+ "type" : "string"
+ },
+ "postRescaleParallelism" : {
+ "type" : "integer"
+ },
+ "preRescaleParallelism" : {
+ "type" : "integer"
+ },
+ "slotSharingGroupId" : {
+ "type" : "any"
+ },
+ "slotSharingGroupName" : {
+ "type" : "string"
+ },
+ "sufficientParallelism" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
+ },
+ "failed" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails"
+ },
+ "ignored" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails"
+ }
+ }
+ },
+ "rescalesCounts" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
+ "properties" : {
+ "completed" : {
+ "type" : "integer"
+ },
+ "failed" : {
+ "type" : "integer"
+ },
+ "ignored" : {
+ "type" : "integer"
+ },
+ "inProgress" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+}</code></pre>
+ </label>
+ </td>
+ </tr>
+ </tbody>
+</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index 80fed9473fc..83cfaea0837 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -978,6 +978,24 @@ paths:
type: array
items:
$ref: "#/components/schemas/JobRescaleDetails"
+ /jobs/{jobid}/rescales/overview:
+ get:
+ description: Return job rescales overview.
+ operationId: getJobRescalesOverview
+ parameters:
+ - name: jobid
+ in: path
+ description: 32-character hexadecimal string value that identifies a
job.
+ required: true
+ schema:
+ $ref: "#/components/schemas/JobID"
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/JobRescalesOverview"
/jobs/{jobid}/rescaling:
patch:
description: Triggers the rescaling of a job. This async operation would
return
@@ -2850,6 +2868,13 @@ components:
type: array
items:
$ref: "#/components/schemas/JobRescaleDetails"
+ JobRescalesOverview:
+ type: object
+ properties:
+ latest:
+ $ref: "#/components/schemas/LatestRescales"
+ rescalesCounts:
+ $ref: "#/components/schemas/RescalesCounts"
JobResourceRequirementsBody:
type: object
additionalProperties:
@@ -3011,6 +3036,15 @@ components:
$ref: "#/components/schemas/RestoredCheckpointStatistics"
savepoint:
$ref: "#/components/schemas/CompletedCheckpointStatistics"
+ LatestRescales:
+ type: object
+ properties:
+ completed:
+ $ref: "#/components/schemas/JobRescaleDetails"
+ failed:
+ $ref: "#/components/schemas/JobRescaleDetails"
+ ignored:
+ $ref: "#/components/schemas/JobRescaleDetails"
LogInfo:
type: object
properties:
@@ -3158,6 +3192,21 @@ components:
upperPart:
type: integer
format: int64
+ RescalesCounts:
+ type: object
+ properties:
+ completed:
+ type: integer
+ format: int64
+ failed:
+ type: integer
+ format: int64
+ ignored:
+ type: integer
+ format: int64
+ inProgress:
+ type: integer
+ format: int64
ResourceID:
pattern: "[0-9a-f]{32}"
type: string
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 5a3d9bc9251..c8767d6f9c5 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3220,6 +3220,209 @@
}
}
}
+ }, {
+ "url" : "/jobs/:jobid/rescales/overview",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview",
+ "properties" : {
+ "rescalesCounts" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
+ "properties" : {
+ "ignored" : {
+ "type" : "integer"
+ },
+ "inProgress" : {
+ "type" : "integer"
+ },
+ "completed" : {
+ "type" : "integer"
+ },
+ "failed" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "latest" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:LatestRescales",
+ "properties" : {
+ "completed" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+ "properties" : {
+ "rescaleUuid" : {
+ "type" : "string"
+ },
+ "resourceRequirementsUuid" : {
+ "type" : "string"
+ },
+ "rescaleAttemptId" : {
+ "type" : "integer"
+ },
+ "vertices" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+ "properties" : {
+ "jobVertexId" : {
+ "type" : "any"
+ },
+ "jobVertexName" : {
+ "type" : "string"
+ },
+ "slotSharingGroupId" : {
+ "type" : "any"
+ },
+ "slotSharingGroupName" : {
+ "type" : "string"
+ },
+ "desiredParallelism" : {
+ "type" : "integer"
+ },
+ "sufficientParallelism" : {
+ "type" : "integer"
+ },
+ "preRescaleParallelism" : {
+ "type" : "integer"
+ },
+ "postRescaleParallelism" : {
+ "type" : "integer"
+ }
+ }
+ }
+ },
+ "slots" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+ "properties" : {
+ "slotSharingGroupId" : {
+ "type" : "any"
+ },
+ "slotSharingGroupName" : {
+ "type" : "string"
+ },
+ "requestResourceProfile" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+ "properties" : {
+ "cpuCores" : {
+ "type" : "number"
+ },
+ "taskHeapMemory" : {
+ "type" : "integer"
+ },
+ "taskOffHeapMemory" : {
+ "type" : "integer"
+ },
+ "managedMemory" : {
+ "type" : "integer"
+ },
+ "networkMemory" : {
+ "type" : "integer"
+ },
+ "extendedResources" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "number"
+ }
+ }
+ }
+ },
+ "desiredSlots" : {
+ "type" : "integer"
+ },
+ "minimalRequiredSlots" : {
+ "type" : "integer"
+ },
+ "preRescaleSlots" : {
+ "type" : "integer"
+ },
+ "postRescaleSlots" : {
+ "type" : "integer"
+ },
+ "acquiredResourceProfile" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+ }
+ }
+ }
+ },
+ "schedulerStates" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+ "properties" : {
+ "state" : {
+ "type" : "string"
+ },
+ "enterTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "leaveTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "durationInMillis" : {
+ "type" : "integer"
+ },
+ "stringifiedException" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "startTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "endTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "terminalState" : {
+ "type" : "string",
+ "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+ },
+ "triggerCause" : {
+ "type" : "string",
+ "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT",
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+ },
+ "terminatedReason" : {
+ "type" : "string",
+ "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED",
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE",
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+ }
+ }
+ },
+ "failed" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails"
+ },
+ "ignored" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails"
+ }
+ }
+ }
+ }
+ }
}, {
"url" : "/jobs/:jobid/rescaling",
"method" : "PATCH",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
new file mode 100644
index 00000000000..a8ca9d05b4a
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverview;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** Handler to respond with a job rescales overview. */
+public class JobRescalesOverviewHandler
+ extends AbstractExecutionGraphHandler<JobRescalesOverview,
JobMessageParameters>
+ implements JsonArchivist {
+
+ public JobRescalesOverviewHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Duration timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobRescalesOverview,
JobMessageParameters>
+ messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected JobRescalesOverview handleRequest(
+ HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo
executionGraphInfo)
+ throws RestHandlerException {
+ return getJobRescalesOverview(executionGraphInfo);
+ }
+
+ private JobRescalesOverview getJobRescalesOverview(ExecutionGraphInfo
executionGraphInfo)
+ throws RestHandlerException {
+ RescalesStatsSnapshot rescalesStatsSnapshot =
executionGraphInfo.getRescalesStatsSnapshot();
+ if (rescalesStatsSnapshot == null ||
rescalesStatsSnapshot.getRescaleHistory() == null) {
+ throw new RestHandlerException(
+ String.format(
+ "The job `%s` has not enabled the `%s` scheduler,
or it has been enabled but the value of configuration option `%s` has not been
set to greater than `0`.",
+ executionGraphInfo.getJobId(),
+ JobManagerOptions.SchedulerType.Adaptive,
+
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
+ HttpResponseStatus.NOT_FOUND,
+ RestHandlerException.LoggingBehavior.IGNORE);
+ }
+ return JobRescalesOverview.fromRescalesStatsSnapshot(
+ executionGraphInfo.getRescalesStatsSnapshot());
+ }
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
+ throws IOException {
+
+ ResponseBody response;
+ try {
+ response = getJobRescalesOverview(executionGraphInfo);
+ } catch (RestHandlerException rhe) {
+ response = new ErrorResponseBody(rhe.getMessage());
+ }
+ return List.of(
+ new ArchivedJson(
+ JobRescalesOverviewHeaders.getInstance()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+
executionGraphInfo.getJobId().toString()),
+ response));
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesOverview.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesOverview.java
new file mode 100644
index 00000000000..10f24a76d8b
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesOverview.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesOverviewHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Response body for {@link JobRescalesOverviewHandler}. */
+@Schema(name = "JobRescalesOverview")
+public class JobRescalesOverview implements ResponseBody, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_RESCALES_COUNTS = "rescalesCounts";
+ public static final String FIELD_NAME_LATEST = "latest";
+
+ @JsonProperty(FIELD_NAME_RESCALES_COUNTS)
+ private final RescalesCounts rescalesCounts;
+
+ @JsonProperty(FIELD_NAME_LATEST)
+ private final LatestRescales latest;
+
+ @JsonCreator
+ public JobRescalesOverview(
+ @JsonProperty(FIELD_NAME_RESCALES_COUNTS) RescalesCounts
rescalesCounts,
+ @JsonProperty(FIELD_NAME_LATEST) LatestRescales latest) {
+ this.rescalesCounts = rescalesCounts;
+ this.latest = latest;
+ }
+
+ public static JobRescalesOverview
fromRescalesStatsSnapshot(RescalesStatsSnapshot snapshot) {
+ RescalesSummarySnapshot summarySnapshot =
snapshot.getRescalesSummarySnapshot();
+ RescalesCounts counts =
+ new RescalesCounts(
+ summarySnapshot.getIgnoredRescalesCount(),
+ summarySnapshot.getInProgressRescaleCount(),
+ summarySnapshot.getCompletedRescalesCount(),
+ summarySnapshot.getFailedRescalesCount());
+ Rescale latestCompletedRescale =
snapshot.getLatestRescale(TerminalState.COMPLETED);
+ JobRescaleDetails latestCompleted =
+ latestCompletedRescale == null
+ ? null
+ :
JobRescaleDetails.fromRescale(latestCompletedRescale, false);
+
+ Rescale latestFailedRescale =
snapshot.getLatestRescale(TerminalState.FAILED);
+ JobRescaleDetails latestFailed =
+ latestFailedRescale == null
+ ? null
+ : JobRescaleDetails.fromRescale(latestFailedRescale,
false);
+
+ Rescale latestIgnoredRescale =
snapshot.getLatestRescale(TerminalState.IGNORED);
+ JobRescaleDetails latestIgnored =
+ latestIgnoredRescale == null
+ ? null
+ : JobRescaleDetails.fromRescale(latestIgnoredRescale,
false);
+ LatestRescales latest = new LatestRescales(latestCompleted,
latestFailed, latestIgnored);
+ return new JobRescalesOverview(counts, latest);
+ }
+
+ public RescalesCounts getRescalesCounts() {
+ return rescalesCounts;
+ }
+
+ public LatestRescales getLatest() {
+ return latest;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobRescalesOverview that = (JobRescalesOverview) o;
+ return Objects.equals(rescalesCounts, that.rescalesCounts)
+ && Objects.equals(latest, that.latest);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rescalesCounts, latest);
+ }
+
+ /** Counts for rescales. */
+ public static class RescalesCounts implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_IGNORED = "ignored";
+ public static final String FIELD_NAME_IN_PROGRESS = "inProgress";
+ public static final String FIELD_NAME_COMPLETED = "completed";
+ public static final String FIELD_NAME_FAILED = "failed";
+
+ @JsonProperty(FIELD_NAME_IGNORED)
+ private final long ignored;
+
+ @JsonProperty(FIELD_NAME_IN_PROGRESS)
+ private final long inProgress;
+
+ @JsonProperty(FIELD_NAME_COMPLETED)
+ private final long completed;
+
+ @JsonProperty(FIELD_NAME_FAILED)
+ private final long failed;
+
+ @JsonCreator
+ public RescalesCounts(
+ @JsonProperty(FIELD_NAME_IGNORED) long ignored,
+ @JsonProperty(FIELD_NAME_IN_PROGRESS) long inProgress,
+ @JsonProperty(FIELD_NAME_COMPLETED) long completed,
+ @JsonProperty(FIELD_NAME_FAILED) long failed) {
+ this.ignored = ignored;
+ this.inProgress = inProgress;
+ this.completed = completed;
+ this.failed = failed;
+ }
+
+ public long getIgnored() {
+ return ignored;
+ }
+
+ public long getInProgress() {
+ return inProgress;
+ }
+
+ public long getCompleted() {
+ return completed;
+ }
+
+ public long getFailed() {
+ return failed;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RescalesCounts that = (RescalesCounts) o;
+ return ignored == that.ignored
+ && inProgress == that.inProgress
+ && completed == that.completed
+ && failed == that.failed;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ignored, inProgress, completed, failed);
+ }
+ }
+
+ /** Latest rescales by terminal state. */
+ public static class LatestRescales implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_COMPLETED = "completed";
+ public static final String FIELD_NAME_FAILED = "failed";
+ public static final String FIELD_NAME_IGNORED = "ignored";
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_COMPLETED)
+ private final JobRescaleDetails completed;
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_FAILED)
+ private final JobRescaleDetails failed;
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_IGNORED)
+ private final JobRescaleDetails ignored;
+
+ @JsonCreator
+ public LatestRescales(
+ @JsonProperty(FIELD_NAME_COMPLETED) @Nullable
JobRescaleDetails completed,
+ @JsonProperty(FIELD_NAME_FAILED) @Nullable JobRescaleDetails
failed,
+ @JsonProperty(FIELD_NAME_IGNORED) @Nullable JobRescaleDetails
ignored) {
+ this.completed = completed;
+ this.failed = failed;
+ this.ignored = ignored;
+ }
+
+ @Nullable
+ public JobRescaleDetails getCompleted() {
+ return completed;
+ }
+
+ @Nullable
+ public JobRescaleDetails getFailed() {
+ return failed;
+ }
+
+ @Nullable
+ public JobRescaleDetails getIgnored() {
+ return ignored;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LatestRescales that = (LatestRescales) o;
+ return Objects.equals(completed, that.completed)
+ && Objects.equals(failed, that.failed)
+ && Objects.equals(ignored, that.ignored);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(completed, failed, ignored);
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesOverviewHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesOverviewHeaders.java
new file mode 100644
index 00000000000..7fb1a30d699
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesOverviewHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesOverviewHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for {@link JobRescalesOverviewHandler}. */
+public class JobRescalesOverviewHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, JobRescalesOverview, JobMessageParameters> {
+ public static final JobRescalesOverviewHeaders INSTANCE = new
JobRescalesOverviewHeaders();
+ public static final String JOB_RESCALES_OVERVIEW_PATH =
"/jobs/:jobid/rescales/overview";
+
+ @Override
+ public Class<JobRescalesOverview> getResponseClass() {
+ return JobRescalesOverview.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Return job rescales overview.";
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return JOB_RESCALES_OVERVIEW_PATH;
+ }
+
+ public static JobRescalesOverviewHeaders getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 5324e3d5446..2ef33671cb4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -99,6 +99,7 @@ import
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandl
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler;
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler;
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesOverviewHandler;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
import
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -168,6 +169,7 @@ import
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinatio
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1227,6 +1229,19 @@ public class WebMonitorEndpoint<T extends
RestfulGateway> extends RestServerEndp
handlers.add(
Tuple2.of(jobRescaleDetailsHandler.getMessageHeaders(),
jobRescaleDetailsHandler));
+ final JobRescalesOverviewHandler jobRescalesOverviewHandler =
+ new JobRescalesOverviewHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobRescalesOverviewHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+ handlers.add(
+ Tuple2.of(
+ jobRescalesOverviewHandler.getMessageHeaders(),
+ jobRescalesOverviewHandler));
+
final JobRescalesHistoryHandler jobRescalesHistoryHandler =
new JobRescalesHistoryHandler(
leaderRetriever,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandlerTest.java
new file mode 100644
index 00000000000..16e726455d6
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandlerTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverview;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescalesOverviewHandler}. */
+class JobRescalesOverviewHandlerTest {
+ private final JobRescalesOverviewHandler testInstance =
+ new JobRescalesOverviewHandler(
+ CompletableFuture::new,
+ TestingUtils.TIMEOUT,
+ Map.of(),
+ JobRescalesOverviewHeaders.getInstance(),
+ new DefaultExecutionGraphCache(TestingUtils.TIMEOUT,
TestingUtils.TIMEOUT),
+ Executors.directExecutor());
+
+ @Test
+ void testSchedulerNotEnabledRescalesOverview() throws
HandlerRequestException {
+ final ExecutionGraphInfo
executionGraphInfoWithNullRescalesStatsSnapshot =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
List.of(), null);
+ final HandlerRequest<EmptyRequestBody> request =
+
createRequest(executionGraphInfoWithNullRescalesStatsSnapshot.getJobId());
+ assertThatThrownBy(
+ () ->
+ testInstance.handleRequest(
+ request,
executionGraphInfoWithNullRescalesStatsSnapshot))
+ .isInstanceOf(RestHandlerException.class);
+ }
+
+ @Test
+ void testRequestNormalJobRescaleOverview()
+ throws HandlerRequestException, RestHandlerException {
+ final RescaleIdInfo rescaleIdInfo =
+ new RescaleIdInfo(new RescaleIdInfo.ResourceRequirementsID(),
1L);
+ final long startTimestamp = 1L;
+ final long endTimestamp = 100L;
+ final String stringfiedException = "mocked exception";
+ final TriggerCause triggerCause = TriggerCause.INITIAL_SCHEDULE;
+ final TerminatedReason terminatedReason = TerminatedReason.SUCCEEDED;
+ final SchedulerStateSpan createdStateSpan =
+ new SchedulerStateSpan(
+ "Created",
+ startTimestamp,
+ endTimestamp,
+ endTimestamp - startTimestamp,
+ stringfiedException);
+
+ final Rescale rescale =
+ new Rescale(rescaleIdInfo)
+ .setStartTimestamp(startTimestamp)
+ .setEndTimestamp(endTimestamp)
+ .setTriggerCause(triggerCause)
+ .setStringifiedException(stringfiedException)
+ .addSchedulerState(createdStateSpan)
+ .setTerminatedReason(terminatedReason);
+
+ RescalesSummary rescalesSummary = new RescalesSummary(2);
+ rescalesSummary.addTerminated(rescale);
+ RescalesSummarySnapshot rescalesSummarySnapshot =
rescalesSummary.createSnapshot();
+ RescalesStatsSnapshot rescalesStatsSnapshot =
+ new RescalesStatsSnapshot(
+ List.of(rescale),
+ Map.of(rescale.getTerminalState(), rescale),
+ rescalesSummarySnapshot);
+
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
+ List.of(),
+ JobManagerOptions.SchedulerType.Adaptive,
+ null,
+ rescalesStatsSnapshot);
+ final HandlerRequest<EmptyRequestBody> request =
+ createRequest(executionGraphInfo.getJobId());
+
+ JobRescalesOverview actual = testInstance.handleRequest(request,
executionGraphInfo);
+
+ JobRescalesOverview expected =
+ new JobRescalesOverview(
+ new JobRescalesOverview.RescalesCounts(0L, 0L, 1L, 0L),
+ new JobRescalesOverview.LatestRescales(
+ new JobRescaleDetails(
+
rescaleIdInfo.getRescaleUuid().toString(),
+
rescaleIdInfo.getResourceRequirementsId().toString(),
+ rescaleIdInfo.getRescaleAttemptId(),
+ Map.of(),
+ Map.of(),
+ // The 'overview' interface will
ignore the scheduler
+ // states, slots and vertices parts.
+ List.of(),
+ startTimestamp,
+ endTimestamp,
+ terminatedReason.getTerminalState(),
+ triggerCause,
+ terminatedReason),
+ null,
+ null));
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
+ throws HandlerRequestException {
+ final Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+
+ return HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobMessageParameters(),
+ pathParameters,
+ new HashMap<>(),
+ List.of());
+ }
+}