This is an automated email from the ASF dual-hosted git repository.
RocMarshal pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new 68fb3cf154d [FLINK-38896][runtime/rest] Introduce the
/jobs/:jobid/rescales/summary endpoint in the REST API (#27987)
68fb3cf154d is described below
commit 68fb3cf154d91e2f76559e928f566de32eaae176
Author: Yuepeng Pan <[email protected]>
AuthorDate: Wed Apr 22 07:20:43 2026 +0800
[FLINK-38896][runtime/rest] Introduce the /jobs/:jobid/rescales/summary
endpoint in the REST API (#27987)
(cherry picked from commit ed21db51baa427f91bbf975a3e45fe13f475ecb0)
Co-authored-by: XComp <[email protected]>
Co-authored-by: spuru9 <[email protected]>
Co-authored-by: mateczagany <[email protected]>
---
.../shortcodes/generated/rest_v1_dispatcher.html | 107 +++++++++++++++
docs/static/generated/rest_v1_dispatcher.yml | 31 +++++
.../src/test/resources/rest_api_v1.snapshot | 83 ++++++++++++
.../job/rescales/JobRescalesHistoryHandler.java | 13 +-
.../job/rescales/JobRescalesOverviewHandler.java | 13 +-
...Handler.java => JobRescalesSummaryHandler.java} | 46 +++----
.../job/rescales/RescalesUnavailableException.java | 45 +++++++
.../messages/job/rescales/JobRescalesSummary.java | 149 +++++++++++++++++++++
.../job/rescales/JobRescalesSummaryHeaders.java | 74 ++++++++++
.../runtime/webmonitor/WebMonitorEndpoint.java | 14 ++
.../rescales/JobRescalesSummaryHandlerTest.java | 149 +++++++++++++++++++++
11 files changed, 672 insertions(+), 52 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 8d7e8b44357..b7ab854eb98 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -4252,6 +4252,113 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
</tr>
</tbody>
</table>
+<table class="rest-api table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left"
colspan="2"><h5><strong>/jobs/:jobid/rescales/summary</strong></h5></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Return job rescales summary.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies
a job.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Request</summary>
+ <pre><code>{}</code></pre>
+ </label>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Response</summary>
+ <pre><code>{
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesSummary",
+ "properties" : {
+ "completedRescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+ },
+ "failedRescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+ },
+ "ignoredRescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+ },
+ "rescalesCounts" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
+ "properties" : {
+ "completed" : {
+ "type" : "integer"
+ },
+ "failed" : {
+ "type" : "integer"
+ },
+ "ignored" : {
+ "type" : "integer"
+ },
+ "inProgress" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "rescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto",
+ "properties" : {
+ "avg" : {
+ "type" : "integer"
+ },
+ "max" : {
+ "type" : "integer"
+ },
+ "min" : {
+ "type" : "integer"
+ },
+ "p50" : {
+ "type" : "number"
+ },
+ "p90" : {
+ "type" : "number"
+ },
+ "p95" : {
+ "type" : "number"
+ },
+ "p99" : {
+ "type" : "number"
+ },
+ "p999" : {
+ "type" : "number"
+ }
+ }
+ }
+ }
+}</code></pre>
+ </label>
+ </td>
+ </tr>
+ </tbody>
+</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index 83cfaea0837..c8f1c7ae891 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -996,6 +996,24 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/JobRescalesOverview"
+ /jobs/{jobid}/rescales/summary:
+ get:
+ description: Return job rescales summary.
+ operationId: getJobRescalesSummary
+ parameters:
+ - name: jobid
+ in: path
+ description: 32-character hexadecimal string value that identifies a
job.
+ required: true
+ schema:
+ $ref: "#/components/schemas/JobID"
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/JobRescalesSummary"
/jobs/{jobid}/rescaling:
patch:
description: Triggers the rescaling of a job. This async operation would
return
@@ -2875,6 +2893,19 @@ components:
$ref: "#/components/schemas/LatestRescales"
rescalesCounts:
$ref: "#/components/schemas/RescalesCounts"
+ JobRescalesSummary:
+ type: object
+ properties:
+ completedRescalesDurationStatsInMillis:
+ $ref: "#/components/schemas/StatsSummaryDto"
+ failedRescalesDurationStatsInMillis:
+ $ref: "#/components/schemas/StatsSummaryDto"
+ ignoredRescalesDurationStatsInMillis:
+ $ref: "#/components/schemas/StatsSummaryDto"
+ rescalesCounts:
+ $ref: "#/components/schemas/RescalesCounts"
+ rescalesDurationStatsInMillis:
+ $ref: "#/components/schemas/StatsSummaryDto"
JobResourceRequirementsBody:
type: object
additionalProperties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index c8767d6f9c5..f282db00ea9 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3423,6 +3423,89 @@
}
}
}
+ }, {
+ "url" : "/jobs/:jobid/rescales/summary",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "jobid"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesSummary",
+ "properties" : {
+ "rescalesCounts" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
+ "properties" : {
+ "ignored" : {
+ "type" : "integer"
+ },
+ "inProgress" : {
+ "type" : "integer"
+ },
+ "completed" : {
+ "type" : "integer"
+ },
+ "failed" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "rescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto",
+ "properties" : {
+ "min" : {
+ "type" : "integer"
+ },
+ "max" : {
+ "type" : "integer"
+ },
+ "avg" : {
+ "type" : "integer"
+ },
+ "p50" : {
+ "type" : "number"
+ },
+ "p90" : {
+ "type" : "number"
+ },
+ "p95" : {
+ "type" : "number"
+ },
+ "p99" : {
+ "type" : "number"
+ },
+ "p999" : {
+ "type" : "number"
+ }
+ }
+ },
+ "completedRescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+ },
+ "ignoredRescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+ },
+ "failedRescalesDurationStatsInMillis" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+ }
+ }
+ }
}, {
"url" : "/jobs/:jobid/rescaling",
"method" : "PATCH",
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
index 6e7541f87e3..478f55e73e1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.rest.handler.job.rescales;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -38,8 +36,6 @@ import
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
@@ -80,14 +76,7 @@ public class JobRescalesHistoryHandler
throws RestHandlerException {
if (executionGraphInfo.getRescalesStatsSnapshot() == null
||
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory() == null) {
- throw new RestHandlerException(
- String.format(
- "The job `%s` has not enabled the `%s` scheduler,
or it has been enabled but the value of configuration option `%s` has not been
set to greater than `0`.",
- executionGraphInfo.getJobId(),
- JobManagerOptions.SchedulerType.Adaptive,
-
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
- HttpResponseStatus.NOT_FOUND,
- RestHandlerException.LoggingBehavior.IGNORE);
+ throw
RescalesUnavailableException.createForJob(executionGraphInfo.getJobId());
}
return JobRescalesHistory.fromRescalesStatsSnapshot(
executionGraphInfo.getRescalesStatsSnapshot());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
index a8ca9d05b4a..c6faeee1a3d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.rest.handler.job.rescales;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -39,8 +37,6 @@ import
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
@@ -81,14 +77,7 @@ public class JobRescalesOverviewHandler
throws RestHandlerException {
RescalesStatsSnapshot rescalesStatsSnapshot =
executionGraphInfo.getRescalesStatsSnapshot();
if (rescalesStatsSnapshot == null ||
rescalesStatsSnapshot.getRescaleHistory() == null) {
- throw new RestHandlerException(
- String.format(
- "The job `%s` has not enabled the `%s` scheduler,
or it has been enabled but the value of configuration option `%s` has not been
set to greater than `0`.",
- executionGraphInfo.getJobId(),
- JobManagerOptions.SchedulerType.Adaptive,
-
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
- HttpResponseStatus.NOT_FOUND,
- RestHandlerException.LoggingBehavior.IGNORE);
+ throw
RescalesUnavailableException.createForJob(executionGraphInfo.getJobId());
}
return JobRescalesOverview.fromRescalesStatsSnapshot(
executionGraphInfo.getRescalesStatsSnapshot());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandler.java
similarity index 69%
copy from
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
copy to
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandler.java
index a8ca9d05b4a..0bcb950469e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandler.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.rest.handler.job.rescales;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -30,8 +28,8 @@ import
org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverview;
-import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummary;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -39,8 +37,6 @@ import
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
@@ -48,16 +44,16 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
-/** Handler to respond with a job rescales overview. */
-public class JobRescalesOverviewHandler
- extends AbstractExecutionGraphHandler<JobRescalesOverview,
JobMessageParameters>
+/** Handler to response job rescales summary. */
+public class JobRescalesSummaryHandler
+ extends AbstractExecutionGraphHandler<JobRescalesSummary,
JobMessageParameters>
implements JsonArchivist {
- public JobRescalesOverviewHandler(
+ public JobRescalesSummaryHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
- MessageHeaders<EmptyRequestBody, JobRescalesOverview,
JobMessageParameters>
+ MessageHeaders<EmptyRequestBody, JobRescalesSummary,
JobMessageParameters>
messageHeaders,
ExecutionGraphCache executionGraphCache,
Executor executor) {
@@ -71,42 +67,36 @@ public class JobRescalesOverviewHandler
}
@Override
- protected JobRescalesOverview handleRequest(
+ protected JobRescalesSummary handleRequest(
HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo
executionGraphInfo)
throws RestHandlerException {
- return getJobRescalesOverview(executionGraphInfo);
+ return getJobRescalesSummary(executionGraphInfo);
}
- private JobRescalesOverview getJobRescalesOverview(ExecutionGraphInfo
executionGraphInfo)
+ private JobRescalesSummary getJobRescalesSummary(ExecutionGraphInfo
executionGraphInfo)
throws RestHandlerException {
RescalesStatsSnapshot rescalesStatsSnapshot =
executionGraphInfo.getRescalesStatsSnapshot();
- if (rescalesStatsSnapshot == null ||
rescalesStatsSnapshot.getRescaleHistory() == null) {
- throw new RestHandlerException(
- String.format(
- "The job `%s` has not enabled the `%s` scheduler,
or it has been enabled but the value of configuration option `%s` has not been
set to greater than `0`.",
- executionGraphInfo.getJobId(),
- JobManagerOptions.SchedulerType.Adaptive,
-
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
- HttpResponseStatus.NOT_FOUND,
- RestHandlerException.LoggingBehavior.IGNORE);
+
+ if (rescalesStatsSnapshot == null
+ || rescalesStatsSnapshot.getRescalesSummarySnapshot() == null)
{
+ throw
RescalesUnavailableException.createForJob(executionGraphInfo.getJobId());
}
- return JobRescalesOverview.fromRescalesStatsSnapshot(
- executionGraphInfo.getRescalesStatsSnapshot());
+
+ return
JobRescalesSummary.fromRescalesStatsSnapshot(rescalesStatsSnapshot);
}
@Override
public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
throws IOException {
-
ResponseBody response;
try {
- response = getJobRescalesOverview(executionGraphInfo);
+ response = getJobRescalesSummary(executionGraphInfo);
} catch (RestHandlerException rhe) {
response = new ErrorResponseBody(rhe.getMessage());
}
return List.of(
new ArchivedJson(
- JobRescalesOverviewHeaders.getInstance()
+ JobRescalesSummaryHeaders.getInstance()
.getTargetRestEndpointURL()
.replace(
':' + JobIDPathParameter.KEY,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/RescalesUnavailableException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/RescalesUnavailableException.java
new file mode 100644
index 00000000000..48446e01363
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/RescalesUnavailableException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** An exception that is thrown if the requested job rescales related data
unavailable. */
+public class RescalesUnavailableException extends RestHandlerException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RescalesUnavailableException(String message) {
+ super(message, HttpResponseStatus.NOT_FOUND,
RestHandlerException.LoggingBehavior.IGNORE);
+ }
+
+ public static RescalesUnavailableException createForJob(JobID jobId) {
+ return new RescalesUnavailableException(
+ String.format(
+ "The job `%s` has not enabled the `%s` scheduler, or
it has been enabled but the value of configuration option `%s` has not been set
to greater than `0`.",
+ jobId,
+ JobManagerOptions.SchedulerType.Adaptive,
+
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()));
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummary.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummary.java
new file mode 100644
index 00000000000..613dbc838a1
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummary.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesSummaryHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.util.stats.StatsSummaryDto;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Response body for {@link JobRescalesSummaryHandler}. */
+@Schema(name = "JobRescalesSummary")
+public class JobRescalesSummary implements ResponseBody, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_RESCALES_COUNTS = "rescalesCounts";
+ public static final String FIELD_NAME_RESCALES_DURATION_STATS =
"rescalesDurationStatsInMillis";
+ public static final String FIELD_NAME_COMPLETED_RESCALES_DURATION_STATS =
+ "completedRescalesDurationStatsInMillis";
+ public static final String FIELD_NAME_IGNORED_RESCALES_DURATION_STATS =
+ "ignoredRescalesDurationStatsInMillis";
+ public static final String FIELD_NAME_FAILED_RESCALES_DURATION_STATS =
+ "failedRescalesDurationStatsInMillis";
+
+ @JsonProperty(FIELD_NAME_RESCALES_COUNTS)
+ private final JobRescalesOverview.RescalesCounts rescalesCounts;
+
+ @JsonProperty(FIELD_NAME_RESCALES_DURATION_STATS)
+ private final StatsSummaryDto rescalesDurationStatsInMillis;
+
+ @JsonProperty(FIELD_NAME_COMPLETED_RESCALES_DURATION_STATS)
+ private final StatsSummaryDto completedRescalesDurationStatsInMillis;
+
+ @JsonProperty(FIELD_NAME_IGNORED_RESCALES_DURATION_STATS)
+ private final StatsSummaryDto ignoredRescalesDurationStatsInMillis;
+
+ @JsonProperty(FIELD_NAME_FAILED_RESCALES_DURATION_STATS)
+ private final StatsSummaryDto failedRescalesDurationStatsInMillis;
+
+ @JsonCreator
+ public JobRescalesSummary(
+ @JsonProperty(FIELD_NAME_RESCALES_COUNTS)
+ JobRescalesOverview.RescalesCounts rescalesCounts,
+ @JsonProperty(FIELD_NAME_RESCALES_DURATION_STATS)
+ StatsSummaryDto rescalesDurationStatsInMillis,
+ @JsonProperty(FIELD_NAME_COMPLETED_RESCALES_DURATION_STATS)
+ StatsSummaryDto completedRescalesDurationStatsInMillis,
+ @JsonProperty(FIELD_NAME_IGNORED_RESCALES_DURATION_STATS)
+ StatsSummaryDto ignoredRescalesDurationStatsInMillis,
+ @JsonProperty(FIELD_NAME_FAILED_RESCALES_DURATION_STATS)
+ StatsSummaryDto failedRescalesDurationStatsInMillis) {
+ this.rescalesCounts = rescalesCounts;
+ this.rescalesDurationStatsInMillis = rescalesDurationStatsInMillis;
+ this.completedRescalesDurationStatsInMillis =
completedRescalesDurationStatsInMillis;
+ this.ignoredRescalesDurationStatsInMillis =
ignoredRescalesDurationStatsInMillis;
+ this.failedRescalesDurationStatsInMillis =
failedRescalesDurationStatsInMillis;
+ }
+
+ public static JobRescalesSummary
fromRescalesStatsSnapshot(RescalesStatsSnapshot snapshot) {
+ RescalesSummarySnapshot summarySnapshot =
snapshot.getRescalesSummarySnapshot();
+ JobRescalesOverview.RescalesCounts counts =
+ new JobRescalesOverview.RescalesCounts(
+ summarySnapshot.getIgnoredRescalesCount(),
+ summarySnapshot.getInProgressRescaleCount(),
+ summarySnapshot.getCompletedRescalesCount(),
+ summarySnapshot.getFailedRescalesCount());
+ return new JobRescalesSummary(
+ counts,
+
StatsSummaryDto.valueOf(summarySnapshot.getAllTerminatedSummarySnapshot()),
+
StatsSummaryDto.valueOf(summarySnapshot.getCompletedRescalesSummarySnapshot()),
+
StatsSummaryDto.valueOf(summarySnapshot.getIgnoredRescalesSummarySnapshot()),
+
StatsSummaryDto.valueOf(summarySnapshot.getFailedRescalesSummarySnapshot()));
+ }
+
+ public JobRescalesOverview.RescalesCounts getRescalesCounts() {
+ return rescalesCounts;
+ }
+
+ public StatsSummaryDto getRescalesDurationStatsInMillis() {
+ return rescalesDurationStatsInMillis;
+ }
+
+ public StatsSummaryDto getCompletedRescalesDurationStatsInMillis() {
+ return completedRescalesDurationStatsInMillis;
+ }
+
+ public StatsSummaryDto getIgnoredRescalesDurationStatsInMillis() {
+ return ignoredRescalesDurationStatsInMillis;
+ }
+
+ public StatsSummaryDto getFailedRescalesDurationStatsInMillis() {
+ return failedRescalesDurationStatsInMillis;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ } else if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobRescalesSummary that = (JobRescalesSummary) o;
+ return Objects.equals(rescalesCounts, that.rescalesCounts)
+ && Objects.equals(rescalesDurationStatsInMillis,
that.rescalesDurationStatsInMillis)
+ && Objects.equals(
+ completedRescalesDurationStatsInMillis,
+ that.completedRescalesDurationStatsInMillis)
+ && Objects.equals(
+ ignoredRescalesDurationStatsInMillis,
+ that.ignoredRescalesDurationStatsInMillis)
+ && Objects.equals(
+ failedRescalesDurationStatsInMillis,
+ that.failedRescalesDurationStatsInMillis);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ rescalesCounts,
+ rescalesDurationStatsInMillis,
+ completedRescalesDurationStatsInMillis,
+ ignoredRescalesDurationStatsInMillis,
+ failedRescalesDurationStatsInMillis);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummaryHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummaryHeaders.java
new file mode 100644
index 00000000000..983e5348b4c
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummaryHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesSummaryHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for {@link JobRescalesSummaryHandler}. */
+public class JobRescalesSummaryHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, JobRescalesSummary, JobMessageParameters> {
+ public static final JobRescalesSummaryHeaders INSTANCE = new
JobRescalesSummaryHeaders();
+ public static final String JOB_RESCALES_SUMMARY_PATH =
"/jobs/:jobid/rescales/summary";
+
+ @Override
+ public Class<JobRescalesSummary> getResponseClass() {
+ return JobRescalesSummary.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Return job rescales summary.";
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public JobMessageParameters getUnresolvedMessageParameters() {
+ return new JobMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return JOB_RESCALES_SUMMARY_PATH;
+ }
+
+ public static JobRescalesSummaryHeaders getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 2ef33671cb4..9e3f295e6e7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -100,6 +100,7 @@ import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandle
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler;
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler;
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesOverviewHandler;
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesSummaryHandler;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
import
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -170,6 +171,7 @@ import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeade
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders;
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1242,6 +1244,18 @@ public class WebMonitorEndpoint<T extends
RestfulGateway> extends RestServerEndp
jobRescalesOverviewHandler.getMessageHeaders(),
jobRescalesOverviewHandler));
+ final JobRescalesSummaryHandler jobRescalesSummaryHandler =
+ new JobRescalesSummaryHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobRescalesSummaryHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+ handlers.add(
+ Tuple2.of(
+ jobRescalesSummaryHandler.getMessageHeaders(),
jobRescalesSummaryHandler));
+
final JobRescalesHistoryHandler jobRescalesHistoryHandler =
new JobRescalesHistoryHandler(
leaderRetriever,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandlerTest.java
new file mode 100644
index 00000000000..728a6661ef1
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandlerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverview;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummary;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
+import org.apache.flink.runtime.rest.messages.util.stats.StatsSummaryDto;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescalesSummaryHandler}. */
+class JobRescalesSummaryHandlerTest {
+ private final JobRescalesSummaryHandler testInstance =
+ new JobRescalesSummaryHandler(
+ CompletableFuture::new,
+ TestingUtils.TIMEOUT,
+ Map.of(),
+ JobRescalesSummaryHeaders.getInstance(),
+ new DefaultExecutionGraphCache(TestingUtils.TIMEOUT,
TestingUtils.TIMEOUT),
+ Executors.directExecutor());
+
+ @Test
+ void testSchedulerNotEnabledRescalesSummary() throws
HandlerRequestException {
+ final ExecutionGraphInfo
executionGraphInfoWithNullRescalesStatsSnapshot =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
List.of(), null);
+ final HandlerRequest<EmptyRequestBody> request =
+
createRequest(executionGraphInfoWithNullRescalesStatsSnapshot.getJobId());
+ assertThatThrownBy(
+ () ->
+ testInstance.handleRequest(
+ request,
executionGraphInfoWithNullRescalesStatsSnapshot))
+ .isInstanceOf(RestHandlerException.class);
+ }
+
+ @Test
+ void testRequestNormalJobRescaleSummary() throws HandlerRequestException,
RestHandlerException {
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L))
+ .setStartTimestamp(1L)
+ .setEndTimestamp(100L)
+ .setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
+ .setStringifiedException("mocked exception")
+ .setTerminatedReason(TerminatedReason.SUCCEEDED);
+
+ RescalesSummary rescalesSummary = new RescalesSummary(2);
+ rescalesSummary.addTerminated(rescale);
+ RescalesSummarySnapshot rescalesSummarySnapshot =
rescalesSummary.createSnapshot();
+ RescalesStatsSnapshot rescalesStatsSnapshot =
+ new RescalesStatsSnapshot(
+ List.of(rescale),
+ Map.of(rescale.getTerminalState(), rescale),
+ rescalesSummarySnapshot);
+
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
+ List.of(),
+ JobManagerOptions.SchedulerType.Adaptive,
+ null,
+ rescalesStatsSnapshot);
+ final HandlerRequest<EmptyRequestBody> request =
+ createRequest(executionGraphInfo.getJobId());
+
+ JobRescalesSummary actual = testInstance.handleRequest(request,
executionGraphInfo);
+ JobRescalesSummary expected =
+ new JobRescalesSummary(
+ new JobRescalesOverview.RescalesCounts(0L, 0L, 1L, 0L),
+ new StatsSummaryDto(99L, 99L, 99L, 99L, 99L, 99L, 99L,
99L),
+ new StatsSummaryDto(99L, 99L, 99L, 99L, 99L, 99L, 99L,
99L),
+ new StatsSummaryDto(
+ 0L,
+ 0L,
+ 0L,
+ Double.NaN,
+ Double.NaN,
+ Double.NaN,
+ Double.NaN,
+ Double.NaN),
+ new StatsSummaryDto(
+ 0L,
+ 0L,
+ 0L,
+ Double.NaN,
+ Double.NaN,
+ Double.NaN,
+ Double.NaN,
+ Double.NaN));
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
+ throws HandlerRequestException {
+ final Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+
+ return HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobMessageParameters(),
+ pathParameters,
+ new HashMap<>(),
+ List.of());
+ }
+}