This is an automated email from the ASF dual-hosted git repository.

RocMarshal pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 68fb3cf154d [FLINK-38896][runtime/rest] Introduce the 
/jobs/:jobid/rescales/summary endpoint in the REST API (#27987)
68fb3cf154d is described below

commit 68fb3cf154d91e2f76559e928f566de32eaae176
Author: Yuepeng Pan <[email protected]>
AuthorDate: Wed Apr 22 07:20:43 2026 +0800

    [FLINK-38896][runtime/rest] Introduce the /jobs/:jobid/rescales/summary 
endpoint in the REST API (#27987)
    
    (cherry picked from commit ed21db51baa427f91bbf975a3e45fe13f475ecb0)
    
    Co-authored-by: XComp <[email protected]>
    Co-authored-by: spuru9 <[email protected]>
    Co-authored-by: mateczagany <[email protected]>
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 107 +++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  31 +++++
 .../src/test/resources/rest_api_v1.snapshot        |  83 ++++++++++++
 .../job/rescales/JobRescalesHistoryHandler.java    |  13 +-
 .../job/rescales/JobRescalesOverviewHandler.java   |  13 +-
 ...Handler.java => JobRescalesSummaryHandler.java} |  46 +++----
 .../job/rescales/RescalesUnavailableException.java |  45 +++++++
 .../messages/job/rescales/JobRescalesSummary.java  | 149 +++++++++++++++++++++
 .../job/rescales/JobRescalesSummaryHeaders.java    |  74 ++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  14 ++
 .../rescales/JobRescalesSummaryHandlerTest.java    | 149 +++++++++++++++++++++
 11 files changed, 672 insertions(+), 52 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 8d7e8b44357..b7ab854eb98 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -4252,6 +4252,113 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/rescales/summary</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Return job rescales summary.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesSummary",
+  "properties" : {
+    "completedRescalesDurationStatsInMillis" : {
+      "type" : "object",
+      "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+    },
+    "failedRescalesDurationStatsInMillis" : {
+      "type" : "object",
+      "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+    },
+    "ignoredRescalesDurationStatsInMillis" : {
+      "type" : "object",
+      "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+    },
+    "rescalesCounts" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
+      "properties" : {
+        "completed" : {
+          "type" : "integer"
+        },
+        "failed" : {
+          "type" : "integer"
+        },
+        "ignored" : {
+          "type" : "integer"
+        },
+        "inProgress" : {
+          "type" : "integer"
+        }
+      }
+    },
+    "rescalesDurationStatsInMillis" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto",
+      "properties" : {
+        "avg" : {
+          "type" : "integer"
+        },
+        "max" : {
+          "type" : "integer"
+        },
+        "min" : {
+          "type" : "integer"
+        },
+        "p50" : {
+          "type" : "number"
+        },
+        "p90" : {
+          "type" : "number"
+        },
+        "p95" : {
+          "type" : "number"
+        },
+        "p99" : {
+          "type" : "number"
+        },
+        "p999" : {
+          "type" : "number"
+        }
+      }
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 83cfaea0837..c8f1c7ae891 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -996,6 +996,24 @@ paths:
             application/json:
               schema:
                 $ref: "#/components/schemas/JobRescalesOverview"
+  /jobs/{jobid}/rescales/summary:
+    get:
+      description: Return job rescales summary.
+      operationId: getJobRescalesSummary
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: "#/components/schemas/JobID"
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/JobRescalesSummary"
   /jobs/{jobid}/rescaling:
     patch:
       description: Triggers the rescaling of a job. This async operation would 
return
@@ -2875,6 +2893,19 @@ components:
           $ref: "#/components/schemas/LatestRescales"
         rescalesCounts:
           $ref: "#/components/schemas/RescalesCounts"
+    JobRescalesSummary:
+      type: object
+      properties:
+        completedRescalesDurationStatsInMillis:
+          $ref: "#/components/schemas/StatsSummaryDto"
+        failedRescalesDurationStatsInMillis:
+          $ref: "#/components/schemas/StatsSummaryDto"
+        ignoredRescalesDurationStatsInMillis:
+          $ref: "#/components/schemas/StatsSummaryDto"
+        rescalesCounts:
+          $ref: "#/components/schemas/RescalesCounts"
+        rescalesDurationStatsInMillis:
+          $ref: "#/components/schemas/StatsSummaryDto"
     JobResourceRequirementsBody:
       type: object
       additionalProperties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index c8767d6f9c5..f282db00ea9 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -3423,6 +3423,89 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/rescales/summary",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesSummary",
+      "properties" : {
+        "rescalesCounts" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
+          "properties" : {
+            "ignored" : {
+              "type" : "integer"
+            },
+            "inProgress" : {
+              "type" : "integer"
+            },
+            "completed" : {
+              "type" : "integer"
+            },
+            "failed" : {
+              "type" : "integer"
+            }
+          }
+        },
+        "rescalesDurationStatsInMillis" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto",
+          "properties" : {
+            "min" : {
+              "type" : "integer"
+            },
+            "max" : {
+              "type" : "integer"
+            },
+            "avg" : {
+              "type" : "integer"
+            },
+            "p50" : {
+              "type" : "number"
+            },
+            "p90" : {
+              "type" : "number"
+            },
+            "p95" : {
+              "type" : "number"
+            },
+            "p99" : {
+              "type" : "number"
+            },
+            "p999" : {
+              "type" : "number"
+            }
+          }
+        },
+        "completedRescalesDurationStatsInMillis" : {
+          "type" : "object",
+          "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+        },
+        "ignoredRescalesDurationStatsInMillis" : {
+          "type" : "object",
+          "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+        },
+        "failedRescalesDurationStatsInMillis" : {
+          "type" : "object",
+          "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/rescaling",
     "method" : "PATCH",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
index 6e7541f87e3..478f55e73e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.job.rescales;
 
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -38,8 +36,6 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
@@ -80,14 +76,7 @@ public class JobRescalesHistoryHandler
             throws RestHandlerException {
         if (executionGraphInfo.getRescalesStatsSnapshot() == null
                 || 
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory() == null) {
-            throw new RestHandlerException(
-                    String.format(
-                            "The job `%s` has not enabled the `%s` scheduler, 
or it has been enabled but the value of configuration option `%s` has not been 
set to greater than `0`.",
-                            executionGraphInfo.getJobId(),
-                            JobManagerOptions.SchedulerType.Adaptive,
-                            
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
-                    HttpResponseStatus.NOT_FOUND,
-                    RestHandlerException.LoggingBehavior.IGNORE);
+            throw 
RescalesUnavailableException.createForJob(executionGraphInfo.getJobId());
         }
         return JobRescalesHistory.fromRescalesStatsSnapshot(
                 executionGraphInfo.getRescalesStatsSnapshot());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
index a8ca9d05b4a..c6faeee1a3d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.job.rescales;
 
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -39,8 +37,6 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
@@ -81,14 +77,7 @@ public class JobRescalesOverviewHandler
             throws RestHandlerException {
         RescalesStatsSnapshot rescalesStatsSnapshot = 
executionGraphInfo.getRescalesStatsSnapshot();
         if (rescalesStatsSnapshot == null || 
rescalesStatsSnapshot.getRescaleHistory() == null) {
-            throw new RestHandlerException(
-                    String.format(
-                            "The job `%s` has not enabled the `%s` scheduler, 
or it has been enabled but the value of configuration option `%s` has not been 
set to greater than `0`.",
-                            executionGraphInfo.getJobId(),
-                            JobManagerOptions.SchedulerType.Adaptive,
-                            
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
-                    HttpResponseStatus.NOT_FOUND,
-                    RestHandlerException.LoggingBehavior.IGNORE);
+            throw 
RescalesUnavailableException.createForJob(executionGraphInfo.getJobId());
         }
         return JobRescalesOverview.fromRescalesStatsSnapshot(
                 executionGraphInfo.getRescalesStatsSnapshot());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandler.java
similarity index 69%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandler.java
index a8ca9d05b4a..0bcb950469e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.job.rescales;
 
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
@@ -30,8 +28,8 @@ import 
org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverview;
-import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummary;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -39,8 +37,6 @@ import 
org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collection;
@@ -48,16 +44,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 
-/** Handler to respond with a job rescales overview. */
-public class JobRescalesOverviewHandler
-        extends AbstractExecutionGraphHandler<JobRescalesOverview, 
JobMessageParameters>
+/** Handler to response job rescales summary. */
+public class JobRescalesSummaryHandler
+        extends AbstractExecutionGraphHandler<JobRescalesSummary, 
JobMessageParameters>
         implements JsonArchivist {
 
-    public JobRescalesOverviewHandler(
+    public JobRescalesSummaryHandler(
             GatewayRetriever<? extends RestfulGateway> leaderRetriever,
             Duration timeout,
             Map<String, String> responseHeaders,
-            MessageHeaders<EmptyRequestBody, JobRescalesOverview, 
JobMessageParameters>
+            MessageHeaders<EmptyRequestBody, JobRescalesSummary, 
JobMessageParameters>
                     messageHeaders,
             ExecutionGraphCache executionGraphCache,
             Executor executor) {
@@ -71,42 +67,36 @@ public class JobRescalesOverviewHandler
     }
 
     @Override
-    protected JobRescalesOverview handleRequest(
+    protected JobRescalesSummary handleRequest(
             HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo 
executionGraphInfo)
             throws RestHandlerException {
-        return getJobRescalesOverview(executionGraphInfo);
+        return getJobRescalesSummary(executionGraphInfo);
     }
 
-    private JobRescalesOverview getJobRescalesOverview(ExecutionGraphInfo 
executionGraphInfo)
+    private JobRescalesSummary getJobRescalesSummary(ExecutionGraphInfo 
executionGraphInfo)
             throws RestHandlerException {
         RescalesStatsSnapshot rescalesStatsSnapshot = 
executionGraphInfo.getRescalesStatsSnapshot();
-        if (rescalesStatsSnapshot == null || 
rescalesStatsSnapshot.getRescaleHistory() == null) {
-            throw new RestHandlerException(
-                    String.format(
-                            "The job `%s` has not enabled the `%s` scheduler, 
or it has been enabled but the value of configuration option `%s` has not been 
set to greater than `0`.",
-                            executionGraphInfo.getJobId(),
-                            JobManagerOptions.SchedulerType.Adaptive,
-                            
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
-                    HttpResponseStatus.NOT_FOUND,
-                    RestHandlerException.LoggingBehavior.IGNORE);
+
+        if (rescalesStatsSnapshot == null
+                || rescalesStatsSnapshot.getRescalesSummarySnapshot() == null) 
{
+            throw 
RescalesUnavailableException.createForJob(executionGraphInfo.getJobId());
         }
-        return JobRescalesOverview.fromRescalesStatsSnapshot(
-                executionGraphInfo.getRescalesStatsSnapshot());
+
+        return 
JobRescalesSummary.fromRescalesStatsSnapshot(rescalesStatsSnapshot);
     }
 
     @Override
     public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
             throws IOException {
-
         ResponseBody response;
         try {
-            response = getJobRescalesOverview(executionGraphInfo);
+            response = getJobRescalesSummary(executionGraphInfo);
         } catch (RestHandlerException rhe) {
             response = new ErrorResponseBody(rhe.getMessage());
         }
         return List.of(
                 new ArchivedJson(
-                        JobRescalesOverviewHeaders.getInstance()
+                        JobRescalesSummaryHeaders.getInstance()
                                 .getTargetRestEndpointURL()
                                 .replace(
                                         ':' + JobIDPathParameter.KEY,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/RescalesUnavailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/RescalesUnavailableException.java
new file mode 100644
index 00000000000..48446e01363
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/RescalesUnavailableException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** An exception that is thrown if the requested job rescales related data 
unavailable. */
+public class RescalesUnavailableException extends RestHandlerException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RescalesUnavailableException(String message) {
+        super(message, HttpResponseStatus.NOT_FOUND, 
RestHandlerException.LoggingBehavior.IGNORE);
+    }
+
+    public static RescalesUnavailableException createForJob(JobID jobId) {
+        return new RescalesUnavailableException(
+                String.format(
+                        "The job `%s` has not enabled the `%s` scheduler, or 
it has been enabled but the value of configuration option `%s` has not been set 
to greater than `0`.",
+                        jobId,
+                        JobManagerOptions.SchedulerType.Adaptive,
+                        
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummary.java
new file mode 100644
index 00000000000..613dbc838a1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummary.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesSummaryHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.util.stats.StatsSummaryDto;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Response body for {@link JobRescalesSummaryHandler}. */
+@Schema(name = "JobRescalesSummary")
+public class JobRescalesSummary implements ResponseBody, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_RESCALES_COUNTS = "rescalesCounts";
+    public static final String FIELD_NAME_RESCALES_DURATION_STATS = 
"rescalesDurationStatsInMillis";
+    public static final String FIELD_NAME_COMPLETED_RESCALES_DURATION_STATS =
+            "completedRescalesDurationStatsInMillis";
+    public static final String FIELD_NAME_IGNORED_RESCALES_DURATION_STATS =
+            "ignoredRescalesDurationStatsInMillis";
+    public static final String FIELD_NAME_FAILED_RESCALES_DURATION_STATS =
+            "failedRescalesDurationStatsInMillis";
+
+    @JsonProperty(FIELD_NAME_RESCALES_COUNTS)
+    private final JobRescalesOverview.RescalesCounts rescalesCounts;
+
+    @JsonProperty(FIELD_NAME_RESCALES_DURATION_STATS)
+    private final StatsSummaryDto rescalesDurationStatsInMillis;
+
+    @JsonProperty(FIELD_NAME_COMPLETED_RESCALES_DURATION_STATS)
+    private final StatsSummaryDto completedRescalesDurationStatsInMillis;
+
+    @JsonProperty(FIELD_NAME_IGNORED_RESCALES_DURATION_STATS)
+    private final StatsSummaryDto ignoredRescalesDurationStatsInMillis;
+
+    @JsonProperty(FIELD_NAME_FAILED_RESCALES_DURATION_STATS)
+    private final StatsSummaryDto failedRescalesDurationStatsInMillis;
+
+    @JsonCreator
+    public JobRescalesSummary(
+            @JsonProperty(FIELD_NAME_RESCALES_COUNTS)
+                    JobRescalesOverview.RescalesCounts rescalesCounts,
+            @JsonProperty(FIELD_NAME_RESCALES_DURATION_STATS)
+                    StatsSummaryDto rescalesDurationStatsInMillis,
+            @JsonProperty(FIELD_NAME_COMPLETED_RESCALES_DURATION_STATS)
+                    StatsSummaryDto completedRescalesDurationStatsInMillis,
+            @JsonProperty(FIELD_NAME_IGNORED_RESCALES_DURATION_STATS)
+                    StatsSummaryDto ignoredRescalesDurationStatsInMillis,
+            @JsonProperty(FIELD_NAME_FAILED_RESCALES_DURATION_STATS)
+                    StatsSummaryDto failedRescalesDurationStatsInMillis) {
+        this.rescalesCounts = rescalesCounts;
+        this.rescalesDurationStatsInMillis = rescalesDurationStatsInMillis;
+        this.completedRescalesDurationStatsInMillis = 
completedRescalesDurationStatsInMillis;
+        this.ignoredRescalesDurationStatsInMillis = 
ignoredRescalesDurationStatsInMillis;
+        this.failedRescalesDurationStatsInMillis = 
failedRescalesDurationStatsInMillis;
+    }
+
+    public static JobRescalesSummary 
fromRescalesStatsSnapshot(RescalesStatsSnapshot snapshot) {
+        RescalesSummarySnapshot summarySnapshot = 
snapshot.getRescalesSummarySnapshot();
+        JobRescalesOverview.RescalesCounts counts =
+                new JobRescalesOverview.RescalesCounts(
+                        summarySnapshot.getIgnoredRescalesCount(),
+                        summarySnapshot.getInProgressRescaleCount(),
+                        summarySnapshot.getCompletedRescalesCount(),
+                        summarySnapshot.getFailedRescalesCount());
+        return new JobRescalesSummary(
+                counts,
+                
StatsSummaryDto.valueOf(summarySnapshot.getAllTerminatedSummarySnapshot()),
+                
StatsSummaryDto.valueOf(summarySnapshot.getCompletedRescalesSummarySnapshot()),
+                
StatsSummaryDto.valueOf(summarySnapshot.getIgnoredRescalesSummarySnapshot()),
+                
StatsSummaryDto.valueOf(summarySnapshot.getFailedRescalesSummarySnapshot()));
+    }
+
+    public JobRescalesOverview.RescalesCounts getRescalesCounts() {
+        return rescalesCounts;
+    }
+
+    public StatsSummaryDto getRescalesDurationStatsInMillis() {
+        return rescalesDurationStatsInMillis;
+    }
+
+    public StatsSummaryDto getCompletedRescalesDurationStatsInMillis() {
+        return completedRescalesDurationStatsInMillis;
+    }
+
+    public StatsSummaryDto getIgnoredRescalesDurationStatsInMillis() {
+        return ignoredRescalesDurationStatsInMillis;
+    }
+
+    public StatsSummaryDto getFailedRescalesDurationStatsInMillis() {
+        return failedRescalesDurationStatsInMillis;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        } else if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        JobRescalesSummary that = (JobRescalesSummary) o;
+        return Objects.equals(rescalesCounts, that.rescalesCounts)
+                && Objects.equals(rescalesDurationStatsInMillis, 
that.rescalesDurationStatsInMillis)
+                && Objects.equals(
+                        completedRescalesDurationStatsInMillis,
+                        that.completedRescalesDurationStatsInMillis)
+                && Objects.equals(
+                        ignoredRescalesDurationStatsInMillis,
+                        that.ignoredRescalesDurationStatsInMillis)
+                && Objects.equals(
+                        failedRescalesDurationStatsInMillis,
+                        that.failedRescalesDurationStatsInMillis);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                rescalesCounts,
+                rescalesDurationStatsInMillis,
+                completedRescalesDurationStatsInMillis,
+                ignoredRescalesDurationStatsInMillis,
+                failedRescalesDurationStatsInMillis);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummaryHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummaryHeaders.java
new file mode 100644
index 00000000000..983e5348b4c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesSummaryHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesSummaryHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for {@link JobRescalesSummaryHandler}. */
+public class JobRescalesSummaryHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, JobRescalesSummary, JobMessageParameters> {
+    public static final JobRescalesSummaryHeaders INSTANCE = new 
JobRescalesSummaryHeaders();
+    public static final String JOB_RESCALES_SUMMARY_PATH = 
"/jobs/:jobid/rescales/summary";
+
+    @Override
+    public Class<JobRescalesSummary> getResponseClass() {
+        return JobRescalesSummary.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Return job rescales summary.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOB_RESCALES_SUMMARY_PATH;
+    }
+
+    public static JobRescalesSummaryHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 2ef33671cb4..9e3f295e6e7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -100,6 +100,7 @@ import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandle
 import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler;
 import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesOverviewHandler;
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesSummaryHandler;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
 import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
 import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -170,6 +171,7 @@ import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeade
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverviewHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1242,6 +1244,18 @@ public class WebMonitorEndpoint<T extends 
RestfulGateway> extends RestServerEndp
                         jobRescalesOverviewHandler.getMessageHeaders(),
                         jobRescalesOverviewHandler));
 
+        final JobRescalesSummaryHandler jobRescalesSummaryHandler =
+                new JobRescalesSummaryHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobRescalesSummaryHeaders.getInstance(),
+                        executionGraphCache,
+                        executor);
+        handlers.add(
+                Tuple2.of(
+                        jobRescalesSummaryHandler.getMessageHeaders(), 
jobRescalesSummaryHandler));
+
         final JobRescalesHistoryHandler jobRescalesHistoryHandler =
                 new JobRescalesHistoryHandler(
                         leaderRetriever,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandlerTest.java
new file mode 100644
index 00000000000..728a6661ef1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesSummaryHandlerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesOverview;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummary;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
+import org.apache.flink.runtime.rest.messages.util.stats.StatsSummaryDto;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescalesSummaryHandler}. */
+class JobRescalesSummaryHandlerTest {
+    private final JobRescalesSummaryHandler testInstance =
+            new JobRescalesSummaryHandler(
+                    CompletableFuture::new,
+                    TestingUtils.TIMEOUT,
+                    Map.of(),
+                    JobRescalesSummaryHeaders.getInstance(),
+                    new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, 
TestingUtils.TIMEOUT),
+                    Executors.directExecutor());
+
+    @Test
+    void testSchedulerNotEnabledRescalesSummary() throws 
HandlerRequestException {
+        final ExecutionGraphInfo 
executionGraphInfoWithNullRescalesStatsSnapshot =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(), 
List.of(), null);
+        final HandlerRequest<EmptyRequestBody> request =
+                
createRequest(executionGraphInfoWithNullRescalesStatsSnapshot.getJobId());
+        assertThatThrownBy(
+                        () ->
+                                testInstance.handleRequest(
+                                        request, 
executionGraphInfoWithNullRescalesStatsSnapshot))
+                .isInstanceOf(RestHandlerException.class);
+    }
+
+    @Test
+    void testRequestNormalJobRescaleSummary() throws HandlerRequestException, 
RestHandlerException {
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L))
+                        .setStartTimestamp(1L)
+                        .setEndTimestamp(100L)
+                        .setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
+                        .setStringifiedException("mocked exception")
+                        .setTerminatedReason(TerminatedReason.SUCCEEDED);
+
+        RescalesSummary rescalesSummary = new RescalesSummary(2);
+        rescalesSummary.addTerminated(rescale);
+        RescalesSummarySnapshot rescalesSummarySnapshot = 
rescalesSummary.createSnapshot();
+        RescalesStatsSnapshot rescalesStatsSnapshot =
+                new RescalesStatsSnapshot(
+                        List.of(rescale),
+                        Map.of(rescale.getTerminalState(), rescale),
+                        rescalesSummarySnapshot);
+
+        final ExecutionGraphInfo executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(),
+                        List.of(),
+                        JobManagerOptions.SchedulerType.Adaptive,
+                        null,
+                        rescalesStatsSnapshot);
+        final HandlerRequest<EmptyRequestBody> request =
+                createRequest(executionGraphInfo.getJobId());
+
+        JobRescalesSummary actual = testInstance.handleRequest(request, 
executionGraphInfo);
+        JobRescalesSummary expected =
+                new JobRescalesSummary(
+                        new JobRescalesOverview.RescalesCounts(0L, 0L, 1L, 0L),
+                        new StatsSummaryDto(99L, 99L, 99L, 99L, 99L, 99L, 99L, 
99L),
+                        new StatsSummaryDto(99L, 99L, 99L, 99L, 99L, 99L, 99L, 
99L),
+                        new StatsSummaryDto(
+                                0L,
+                                0L,
+                                0L,
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN),
+                        new StatsSummaryDto(
+                                0L,
+                                0L,
+                                0L,
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN,
+                                Double.NaN));
+        assertThat(actual).isEqualTo(expected);
+    }
+
+    private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
+            throws HandlerRequestException {
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new JobMessageParameters(),
+                pathParameters,
+                new HashMap<>(),
+                List.of());
+    }
+}


Reply via email to