This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b1c858eae73 [FLINK-38895][runtime] Introduce the 
/jobs/:jobid/rescales/details/:rescaleuuid endpoint in the REST API
b1c858eae73 is described below

commit b1c858eae739dffb84a76e6c4c7a6c938bfc6d8c
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Mar 31 08:17:23 2026 +0800

    [FLINK-38895][runtime] Introduce the 
/jobs/:jobid/rescales/details/:rescaleuuid endpoint in the REST API
    
    Co-authored-by: XComp <[email protected]>
    Co-authored-by: WeiZhong94 <[email protected]>
    Co-authored-by: mateczagany <[email protected]>
    Co-authored-by: ferenc-csaky <[email protected]>
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 190 ++++++++
 docs/static/generated/rest_v1_dispatcher.yml       | 170 +++++++
 .../src/test/resources/rest_api_v1.snapshot        | 183 ++++++++
 .../job/rescales/JobRescaleDetailsHandler.java     | 127 ++++++
 .../job/rescales/JobIDRescaleIDParameters.java     |  41 ++
 .../messages/job/rescales/JobRescaleDetails.java   | 508 +++++++++++++++++++++
 .../job/rescales/JobRescaleDetailsHeaders.java     |  74 +++
 .../job/rescales/JobRescaleIDPathParameter.java    |  52 +++
 .../messages/job/rescales}/SchedulerStateSpan.java |  39 +-
 .../json/SlotSharingGroupIDKeyDeserializer.java    |  35 ++
 .../json/SlotSharingGroupIDKeySerializer.java      |  43 ++
 .../adaptive/timeline/DefaultRescaleTimeline.java  |   5 +-
 .../scheduler/adaptive/timeline/Rescale.java       |  26 +-
 .../scheduler/adaptive/timeline/RescaleIdInfo.java |  33 +-
 .../adaptive/timeline/RescalesStatsSnapshot.java   |  17 +
 .../timeline/VertexParallelismRescale.java         | 160 -------
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  13 +
 .../job/rescales/JobRescaleDetailsHandlerTest.java | 175 +++++++
 .../scheduler/adaptive/timeline/RescaleTest.java   |  59 ++-
 .../adaptive/timeline/RescaleTimelineITCase.java   |  12 +-
 .../adaptive/timeline/RescalesSummaryTest.java     |   4 +-
 21 files changed, 1743 insertions(+), 223 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 07ebf893e07..10fba048344 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3643,6 +3643,196 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/rescales/details/:rescaleuuid</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Return a job rescale details.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+<li><code>rescaleuuid</code> - String value that identifies a job rescale.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+  "properties" : {
+    "endTimestampInMillis" : {
+      "type" : "integer"
+    },
+    "rescaleAttemptId" : {
+      "type" : "integer"
+    },
+    "rescaleUuid" : {
+      "type" : "string"
+    },
+    "resourceRequirementsUuid" : {
+      "type" : "string"
+    },
+    "schedulerStates" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+        "properties" : {
+          "durationInMillis" : {
+            "type" : "integer"
+          },
+          "enterTimestampInMillis" : {
+            "type" : "integer"
+          },
+          "leaveTimestampInMillis" : {
+            "type" : "integer"
+          },
+          "state" : {
+            "type" : "string"
+          },
+          "stringifiedException" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "slots" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+        "properties" : {
+          "acquiredResourceProfile" : {
+            "type" : "object",
+            "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+          },
+          "desiredSlots" : {
+            "type" : "integer"
+          },
+          "minimalRequiredSlots" : {
+            "type" : "integer"
+          },
+          "postRescaleSlots" : {
+            "type" : "integer"
+          },
+          "preRescaleSlots" : {
+            "type" : "integer"
+          },
+          "requestResourceProfile" : {
+            "type" : "object",
+            "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+            "properties" : {
+              "cpuCores" : {
+                "type" : "number"
+              },
+              "extendedResources" : {
+                "type" : "object",
+                "additionalProperties" : {
+                  "type" : "number"
+                }
+              },
+              "managedMemory" : {
+                "type" : "integer"
+              },
+              "networkMemory" : {
+                "type" : "integer"
+              },
+              "taskHeapMemory" : {
+                "type" : "integer"
+              },
+              "taskOffHeapMemory" : {
+                "type" : "integer"
+              }
+            }
+          },
+          "slotSharingGroupId" : {
+            "type" : "any"
+          },
+          "slotSharingGroupName" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "startTimestampInMillis" : {
+      "type" : "integer"
+    },
+    "terminalState" : {
+      "type" : "string",
+      "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+    },
+    "terminatedReason" : {
+      "type" : "string",
+      "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", 
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", 
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+    },
+    "triggerCause" : {
+      "type" : "string",
+      "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", 
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+    },
+    "vertices" : {
+      "type" : "object",
+      "additionalProperties" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+        "properties" : {
+          "desiredParallelism" : {
+            "type" : "integer"
+          },
+          "jobVertexId" : {
+            "type" : "any"
+          },
+          "jobVertexName" : {
+            "type" : "string"
+          },
+          "postRescaleParallelism" : {
+            "type" : "integer"
+          },
+          "preRescaleParallelism" : {
+            "type" : "integer"
+          },
+          "slotSharingGroupId" : {
+            "type" : "any"
+          },
+          "slotSharingGroupName" : {
+            "type" : "string"
+          },
+          "sufficientParallelism" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 16df36dc728..43fa4b3fe28 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -934,6 +934,30 @@ paths:
             application/json:
               schema:
                 $ref: "#/components/schemas/JobRescaleConfigInfo"
+  /jobs/{jobid}/rescales/details/{rescaleuuid}:
+    get:
+      description: Return a job rescale details.
+      operationId: getJobRescaleDetails
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: "#/components/schemas/JobID"
+      - name: rescaleuuid
+        in: path
+        description: String value that identifies a job rescale.
+        required: true
+        schema:
+          $ref: "#/components/schemas/RescaleUUID"
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: "#/components/schemas/JobRescaleDetails"
   /jobs/{jobid}/rescaling:
     patch:
       description: Triggers the rescaling of a job. This async operation would 
return
@@ -2768,6 +2792,40 @@ components:
         submissionResourceWaitTimeoutInMillis:
           type: integer
           format: int64
+    JobRescaleDetails:
+      type: object
+      properties:
+        endTimestampInMillis:
+          type: integer
+          format: int64
+        rescaleAttemptId:
+          type: integer
+          format: int64
+        rescaleUuid:
+          type: string
+        resourceRequirementsUuid:
+          type: string
+        schedulerStates:
+          type: array
+          items:
+            $ref: "#/components/schemas/SchedulerStateSpan"
+        slots:
+          type: object
+          additionalProperties:
+            $ref: "#/components/schemas/SlotSharingGroupRescaleInfo"
+        startTimestampInMillis:
+          type: integer
+          format: int64
+        terminalState:
+          $ref: "#/components/schemas/TerminalState"
+        terminatedReason:
+          $ref: "#/components/schemas/TerminatedReason"
+        triggerCause:
+          $ref: "#/components/schemas/TriggerCause"
+        vertices:
+          type: object
+          additionalProperties:
+            $ref: "#/components/schemas/VertexParallelismRescaleInfo"
     JobResourceRequirementsBody:
       type: object
       additionalProperties:
@@ -3064,6 +3122,18 @@ components:
       - CLAIM
       - NO_CLAIM
       - LEGACY
+    RescaleUUID:
+      type: object
+      properties:
+        bytes:
+          type: string
+          format: byte
+        lowerPart:
+          type: integer
+          format: int64
+        upperPart:
+          type: integer
+          format: int64
     ResourceID:
       pattern: "[0-9a-f]{32}"
       type: string
@@ -3166,6 +3236,22 @@ components:
       type: string
       enum:
       - REACTIVE
+    SchedulerStateSpan:
+      type: object
+      properties:
+        durationInMillis:
+          type: integer
+          format: int64
+        enterTimestampInMillis:
+          type: integer
+          format: int64
+        leaveTimestampInMillis:
+          type: integer
+          format: int64
+        state:
+          type: string
+        stringifiedException:
+          type: string
     SchedulerType:
       type: string
       enum:
@@ -3206,6 +3292,29 @@ components:
         upperPart:
           type: integer
           format: int64
+    SlotSharingGroupRescaleInfo:
+      type: object
+      properties:
+        acquiredResourceProfile:
+          $ref: "#/components/schemas/ResourceProfileInfo"
+        desiredSlots:
+          type: integer
+          format: int32
+        minimalRequiredSlots:
+          type: integer
+          format: int32
+        postRescaleSlots:
+          type: integer
+          format: int32
+        preRescaleSlots:
+          type: integer
+          format: int32
+        requestResourceProfile:
+          $ref: "#/components/schemas/ResourceProfileInfo"
+        slotSharingGroupId:
+          $ref: "#/components/schemas/SlotSharingGroupId"
+        slotSharingGroupName:
+          type: string
     StatsSummaryDto:
       type: object
       properties:
@@ -3660,6 +3769,23 @@ components:
           type: array
           items:
             $ref: "#/components/schemas/TaskManagerInfo"
+    TerminalState:
+      type: string
+      enum:
+      - COMPLETED
+      - FAILED
+      - IGNORED
+    TerminatedReason:
+      type: string
+      enum:
+      - SUCCEEDED
+      - EXCEPTION_OCCURRED
+      - RESOURCE_REQUIREMENTS_UPDATED
+      - NO_RESOURCES_OR_PARALLELISMS_CHANGE
+      - JOB_FINISHED
+      - JOB_FAILED
+      - JOB_CANCELED
+      - JOB_FAILOVER_RESTARTING
     TerminationMode:
       type: string
       enum:
@@ -3685,6 +3811,13 @@ components:
       - FULL
       - ON_CPU
       - OFF_CPU
+    TriggerCause:
+      type: string
+      enum:
+      - INITIAL_SCHEDULE
+      - UPDATE_REQUIREMENT
+      - NEW_RESOURCE_AVAILABLE
+      - RECOVERABLE_FAILOVER
     TriggerId:
       pattern: "[0-9a-f]{32}"
       type: string
@@ -3734,3 +3867,40 @@ components:
         endTimestamp:
           type: integer
           format: int64
+    VertexParallelismInformation:
+      type: object
+      properties:
+        maxParallelism:
+          type: integer
+          format: int32
+        minParallelism:
+          type: integer
+          format: int32
+        parallelism:
+          type: integer
+          format: int32
+    VertexParallelismRescaleInfo:
+      type: object
+      properties:
+        desiredParallelism:
+          type: integer
+          format: int32
+        jobVertexId:
+          $ref: "#/components/schemas/JobVertexID"
+        jobVertexName:
+          type: string
+        postRescaleParallelism:
+          type: integer
+          format: int32
+        preRescaleParallelism:
+          type: integer
+          format: int32
+        requiredParallelisms:
+          $ref: "#/components/schemas/VertexParallelismInformation"
+        slotSharingGroupId:
+          $ref: "#/components/schemas/SlotSharingGroupId"
+        slotSharingGroupName:
+          type: string
+        sufficientParallelism:
+          type: integer
+          format: int32
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index e065545a069..b4a16c9cd74 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -4832,5 +4832,188 @@
         }
       }
     }
+  }, {
+    "url": "/jobs/:jobid/rescales/details/:rescaleuuid",
+    "method": "GET",
+    "status-code": "200 OK",
+    "file-upload": false,
+    "path-parameters": {
+      "pathParameters": [
+        {
+          "key": "jobid"
+        },
+        {
+          "key": "rescaleuuid"
+        }
+      ]
+    },
+    "query-parameters": {
+      "queryParameters": []
+    },
+    "request": {
+      "type": "object"
+    },
+    "response": {
+      "type": "object",
+      "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+      "properties": {
+        "endTimestampInMillis": {
+          "type": "integer"
+        },
+        "rescaleAttemptId": {
+          "type": "integer"
+        },
+        "rescaleUuid": {
+          "type": "string"
+        },
+        "resourceRequirementsUuid": {
+          "type": "string"
+        },
+        "schedulerStates": {
+          "type": "array",
+          "items": {
+            "type": "object",
+            "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+            "properties": {
+              "durationInMillis": {
+                "type": "integer"
+              },
+              "enterTimestampInMillis": {
+                "type": "integer"
+              },
+              "leaveTimestampInMillis": {
+                "type": "integer"
+              },
+              "state": {
+                "type": "string"
+              },
+              "stringifiedException": {
+                "type": "string"
+              }
+            }
+          }
+        },
+        "slots": {
+          "type": "object",
+          "additionalProperties": {
+            "type": "object",
+            "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+            "properties": {
+              "acquiredResourceProfile": {
+                "type": "object",
+                "$ref": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+              },
+              "desiredSlots": {
+                "type": "integer"
+              },
+              "minimalRequiredSlots": {
+                "type": "integer"
+              },
+              "postRescaleSlots": {
+                "type": "integer"
+              },
+              "preRescaleSlots": {
+                "type": "integer"
+              },
+              "requestResourceProfile": {
+                "type": "object",
+                "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+                "properties": {
+                  "cpuCores": {
+                    "type": "number"
+                  },
+                  "extendedResources": {
+                    "type": "object",
+                    "additionalProperties": {
+                      "type": "number"
+                    }
+                  },
+                  "managedMemory": {
+                    "type": "integer"
+                  },
+                  "networkMemory": {
+                    "type": "integer"
+                  },
+                  "taskHeapMemory": {
+                    "type": "integer"
+                  },
+                  "taskOffHeapMemory": {
+                    "type": "integer"
+                  }
+                }
+              },
+              "slotSharingGroupId": {
+                "type": "any"
+              },
+              "slotSharingGroupName": {
+                "type": "string"
+              }
+            }
+          }
+        },
+        "startTimestampInMillis": {
+          "type": "integer"
+        },
+        "terminalState": {
+          "type": "string",
+          "enum": ["COMPLETED", "FAILED", "IGNORED"]
+        },
+        "terminatedReason": {
+          "type": "string",
+          "enum": [
+            "SUCCEEDED",
+            "EXCEPTION_OCCURRED",
+            "RESOURCE_REQUIREMENTS_UPDATED",
+            "NO_RESOURCES_OR_PARALLELISMS_CHANGE",
+            "JOB_FINISHED",
+            "JOB_FAILED",
+            "JOB_CANCELED",
+            "JOB_FAILOVER_RESTARTING"
+          ]
+        },
+        "triggerCause": {
+          "type": "string",
+          "enum": [
+            "INITIAL_SCHEDULE",
+            "UPDATE_REQUIREMENT",
+            "NEW_RESOURCE_AVAILABLE",
+            "RECOVERABLE_FAILOVER"
+          ]
+        },
+        "vertices": {
+          "type": "object",
+          "additionalProperties": {
+            "type": "object",
+            "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+            "properties": {
+              "desiredParallelism": {
+                "type": "integer"
+              },
+              "jobVertexId": {
+                "type": "any"
+              },
+              "jobVertexName": {
+                "type": "string"
+              },
+              "postRescaleParallelism": {
+                "type": "integer"
+              },
+              "preRescaleParallelism": {
+                "type": "integer"
+              },
+              "slotSharingGroupId": {
+                "type": "any"
+              },
+              "slotSharingGroupName": {
+                "type": "string"
+              },
+              "sufficientParallelism": {
+                "type": "integer"
+              }
+            }
+          }
+        }
+      }
+    }
   } ]
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandler.java
new file mode 100644
index 00000000000..d3f94a32c24
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandler.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobIDRescaleIDParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleIDPathParameter;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** Handler serving the job rescale. */
+public class JobRescaleDetailsHandler
+        extends AbstractExecutionGraphHandler<JobRescaleDetails, 
JobIDRescaleIDParameters>
+        implements JsonArchivist {
+
+    public JobRescaleDetailsHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Duration timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, JobRescaleDetails, 
JobIDRescaleIDParameters>
+                    messageHeaders,
+            ExecutionGraphCache executionGraphCache,
+            Executor executor) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);
+    }
+
+    @Override
+    protected JobRescaleDetails handleRequest(
+            HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo 
executionGraphInfo)
+            throws RestHandlerException {
+
+        RescalesStatsSnapshot rescalesStatsSnapshot = 
executionGraphInfo.getRescalesStatsSnapshot();
+        JobID jobId = executionGraphInfo.getJobId();
+
+        if (rescalesStatsSnapshot == null) {
+            throw new RestHandlerException(
+                    "AdaptiveScheduler rescales was not enabled for job " + 
jobId + '.',
+                    HttpResponseStatus.NOT_FOUND);
+        }
+
+        RescaleIdInfo.RescaleUUID rescaleUuid =
+                request.getPathParameter(JobRescaleIDPathParameter.class);
+        Rescale rescale = rescalesStatsSnapshot.getRescale(rescaleUuid);
+
+        if (rescale == null) {
+            throw new RestHandlerException(
+                    "Could not find rescale details of specified rescaleUuid " 
+ rescaleUuid + '.',
+                    HttpResponseStatus.NOT_FOUND);
+        }
+
+        return JobRescaleDetails.fromRescale(rescale, true);
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
+            throws IOException {
+        if (executionGraphInfo.getRescalesStatsSnapshot() == null
+                || 
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory() == null) {
+            return List.of();
+        }
+
+        List<ArchivedJson> archives = new ArrayList<>();
+        List<Rescale> rescales = 
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+        final String commonPathPrefix =
+                JobRescaleDetailsHeaders.getInstance()
+                        .getTargetRestEndpointURL()
+                        .replace(
+                                ':' + JobIDPathParameter.KEY,
+                                executionGraphInfo.getJobId().toString());
+        for (Rescale rescale : rescales) {
+            archives.add(
+                    new ArchivedJson(
+                            commonPathPrefix.replace(
+                                    ':' + JobRescaleIDPathParameter.KEY,
+                                    
rescale.getRescaleIdInfo().getRescaleUuid().toString()),
+                            JobRescaleDetails.fromRescale(rescale, true)));
+        }
+        return archives;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobIDRescaleIDParameters.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobIDRescaleIDParameters.java
new file mode 100644
index 00000000000..fdcf9fe6b18
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobIDRescaleIDParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * {@link MessageParameters} for {@link
+ * 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler}.
+ */
+public class JobIDRescaleIDParameters extends JobMessageParameters {
+
+    private final JobRescaleIDPathParameter jobRescaleIDPathParameter =
+            new JobRescaleIDPathParameter();
+
+    @Override
+    public Collection<MessagePathParameter<?>> getPathParameters() {
+        return Arrays.asList(jobPathParameter, jobRescaleIDPathParameter);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
new file mode 100644
index 00000000000..fa9b7aeb195
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.rest.messages.ResourceProfileInfo;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import 
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDDeserializer;
+import 
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDKeyDeserializer;
+import 
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDKeySerializer;
+import 
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDSerializer;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupRescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Job rescales details class. */
+@Schema(name = "JobRescaleDetails")
+public class JobRescaleDetails implements ResponseBody, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_RESCALE_UUID = "rescaleUuid";
+    public static final String FIELD_NAME_RESOURCE_REQUIREMENTS_UUID = 
"resourceRequirementsUuid";
+    public static final String FIELD_NAME_RESCALE_ATTEMPT_ID = 
"rescaleAttemptId";
+    public static final String FIELD_NAME_VERTICES = "vertices";
+    public static final String FIELD_NAME_SLOTS = "slots";
+    public static final String FIELD_NAME_SCHEDULER_STATES = "schedulerStates";
+    public static final String FIELD_NAME_START_TIMESTAMP = 
"startTimestampInMillis";
+    public static final String FIELD_NAME_END_TIMESTAMP = 
"endTimestampInMillis";
+    public static final String FIELD_NAME_TERMINAL_STATE = "terminalState";
+    public static final String FIELD_NAME_TRIGGER_CAUSE = "triggerCause";
+    public static final String FIELD_NAME_TERMINATED_REASON = 
"terminatedReason";
+
+    @JsonProperty(FIELD_NAME_RESCALE_UUID)
+    private final String rescaleUuid;
+
+    @JsonProperty(FIELD_NAME_RESOURCE_REQUIREMENTS_UUID)
+    private final String resourceRequirementsUuid;
+
+    @JsonProperty(FIELD_NAME_RESCALE_ATTEMPT_ID)
+    private final long rescaleAttemptId;
+
+    @JsonProperty(FIELD_NAME_VERTICES)
+    @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
+    @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
+    private final Map<JobVertexID, VertexParallelismRescaleInfo> vertices;
+
+    @JsonProperty(FIELD_NAME_SLOTS)
+    @JsonSerialize(keyUsing = SlotSharingGroupIDKeySerializer.class)
+    @JsonDeserialize(keyUsing = SlotSharingGroupIDKeyDeserializer.class)
+    private final Map<SlotSharingGroupId, SlotSharingGroupRescaleInfo> slots;
+
+    @JsonProperty(FIELD_NAME_SCHEDULER_STATES)
+    private final List<SchedulerStateSpan> schedulerStates;
+
+    @JsonProperty(FIELD_NAME_START_TIMESTAMP)
+    private final Long startTimestamp;
+
+    @Nullable
+    @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+    private final Long endTimestamp;
+
+    @JsonProperty(FIELD_NAME_TERMINAL_STATE)
+    private final TerminalState terminalState;
+
+    @JsonProperty(FIELD_NAME_TRIGGER_CAUSE)
+    private final TriggerCause triggerCause;
+
+    @JsonProperty(FIELD_NAME_TERMINATED_REASON)
+    private final TerminatedReason terminatedReason;
+
+    @JsonCreator
+    public JobRescaleDetails(
+            @JsonProperty(FIELD_NAME_RESCALE_UUID) String rescaleUuid,
+            @JsonProperty(FIELD_NAME_RESOURCE_REQUIREMENTS_UUID) String 
resourceRequirementsUuid,
+            @JsonProperty(FIELD_NAME_RESCALE_ATTEMPT_ID) long rescaleAttemptId,
+            @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
+                    @JsonDeserialize(keyUsing = 
JobVertexIDKeyDeserializer.class)
+                    @JsonProperty(FIELD_NAME_VERTICES)
+                    Map<JobVertexID, VertexParallelismRescaleInfo> vertices,
+            @JsonSerialize(keyUsing = SlotSharingGroupIDKeySerializer.class)
+                    @JsonDeserialize(keyUsing = 
SlotSharingGroupIDKeyDeserializer.class)
+                    @JsonProperty(FIELD_NAME_SLOTS)
+                    Map<SlotSharingGroupId, SlotSharingGroupRescaleInfo> slots,
+            @JsonProperty(FIELD_NAME_SCHEDULER_STATES) 
List<SchedulerStateSpan> schedulerStates,
+            @JsonProperty(FIELD_NAME_START_TIMESTAMP) Long startTimestamp,
+            @Nullable @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long 
endTimestamp,
+            @JsonProperty(FIELD_NAME_TERMINAL_STATE) TerminalState 
terminalState,
+            @JsonProperty(FIELD_NAME_TRIGGER_CAUSE) TriggerCause triggerCause,
+            @JsonProperty(FIELD_NAME_TERMINATED_REASON) TerminatedReason 
terminatedReason) {
+        this.rescaleUuid = rescaleUuid;
+        this.resourceRequirementsUuid = resourceRequirementsUuid;
+        this.rescaleAttemptId = rescaleAttemptId;
+        this.vertices = vertices;
+        this.slots = slots;
+        this.schedulerStates = schedulerStates;
+        this.startTimestamp = startTimestamp;
+        this.endTimestamp = endTimestamp;
+        this.terminalState = terminalState;
+        this.triggerCause = triggerCause;
+        this.terminatedReason = terminatedReason;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        JobRescaleDetails that = (JobRescaleDetails) o;
+        return rescaleAttemptId == that.rescaleAttemptId
+                && Objects.equals(rescaleUuid, that.rescaleUuid)
+                && Objects.equals(resourceRequirementsUuid, 
that.resourceRequirementsUuid)
+                && Objects.equals(vertices, that.vertices)
+                && Objects.equals(slots, that.slots)
+                && Objects.equals(schedulerStates, that.schedulerStates)
+                && Objects.equals(startTimestamp, that.startTimestamp)
+                && Objects.equals(endTimestamp, that.endTimestamp)
+                && terminalState == that.terminalState
+                && triggerCause == that.triggerCause
+                && terminatedReason == that.terminatedReason;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                rescaleUuid,
+                resourceRequirementsUuid,
+                rescaleAttemptId,
+                vertices,
+                slots,
+                schedulerStates,
+                startTimestamp,
+                endTimestamp,
+                terminalState,
+                triggerCause,
+                terminatedReason);
+    }
+
+    public static JobRescaleDetails fromRescale(Rescale rescale, boolean 
includeSchedulerStates) {
+        return new JobRescaleDetails(
+                rescale.getRescaleIdInfo().getRescaleUuid().toString(),
+                
rescale.getRescaleIdInfo().getResourceRequirementsId().toString(),
+                rescale.getRescaleIdInfo().getRescaleAttemptId(),
+                rescale.getVertices(),
+                convertMapValues(
+                        rescale.getSlots(),
+                        
SlotSharingGroupRescaleInfo::fromSlotSharingGroupRescale),
+                includeSchedulerStates ? rescale.getSchedulerStates() : null,
+                rescale.getStartTimestamp(),
+                rescale.getEndTimestamp(),
+                rescale.getTerminalState(),
+                rescale.getTriggerCause(),
+                rescale.getTerminatedReason());
+    }
+
+    private static <K, NV, OV> Map<K, NV> convertMapValues(
+            Map<K, OV> rawMap, Function<OV, NV> valueMapper) {
+        return rawMap == null
+                ? new HashMap<>()
+                : rawMap.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        kovEntry -> 
valueMapper.apply(kovEntry.getValue())));
+    }
+
+    /** The rescale information of a {@link 
org.apache.flink.runtime.jobgraph.JobVertex}. */
+    public static final class VertexParallelismRescaleInfo implements 
Serializable {
+        private static final long serialVersionUID = 1L;
+
+        public static final String FIELD_NAME_JOB_VERTEX_ID = "jobVertexId";
+        public static final String FIELD_NAME_VERTEX_NAME = "jobVertexName";
+        public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = 
"slotSharingGroupId";
+        public static final String FIELD_NAME_SLOT_SHARING_GROUP_NAME = 
"slotSharingGroupName";
+        public static final String FIELD_NAME_DESIRED_PARALLELISM = 
"desiredParallelism";
+        public static final String FIELD_NAME_SUFFICIENT_PARALLELISM = 
"sufficientParallelism";
+        public static final String FIELD_NAME_PRE_RESCALE_PARALLELISM = 
"preRescaleParallelism";
+        public static final String FIELD_NAME_POST_RESCALE_PARALLELISM = 
"postRescaleParallelism";
+
+        @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+        @JsonSerialize(using = JobVertexIDSerializer.class)
+        @JsonDeserialize(using = JobVertexIDDeserializer.class)
+        private final JobVertexID jobVertexId;
+
+        @JsonProperty(FIELD_NAME_VERTEX_NAME)
+        private String jobVertexName;
+
+        @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+        @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+        @JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
+        private SlotSharingGroupId slotSharingGroupId;
+
+        @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME)
+        private String slotSharingGroupName;
+
+        @JsonProperty(FIELD_NAME_DESIRED_PARALLELISM)
+        private Integer desiredParallelism;
+
+        @JsonProperty(FIELD_NAME_SUFFICIENT_PARALLELISM)
+        private Integer sufficientParallelism;
+
+        @Nullable
+        @JsonProperty(FIELD_NAME_PRE_RESCALE_PARALLELISM)
+        private Integer preRescaleParallelism;
+
+        @Nullable
+        @JsonProperty(FIELD_NAME_POST_RESCALE_PARALLELISM)
+        private Integer postRescaleParallelism;
+
+        @JsonCreator
+        public VertexParallelismRescaleInfo(
+                @JsonSerialize(using = JobVertexIDSerializer.class)
+                        @JsonDeserialize(using = JobVertexIDDeserializer.class)
+                        @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+                        JobVertexID jobVertexId,
+                @JsonProperty(FIELD_NAME_VERTEX_NAME) String jobVertexName,
+                @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+                        @JsonDeserialize(using = 
SlotSharingGroupIDDeserializer.class)
+                        @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+                        SlotSharingGroupId slotSharingGroupId,
+                @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME) String 
slotSharingGroupName,
+                @JsonProperty(FIELD_NAME_DESIRED_PARALLELISM) Integer 
desiredParallelism,
+                @JsonProperty(FIELD_NAME_SUFFICIENT_PARALLELISM) Integer 
sufficientParallelism,
+                @JsonProperty(FIELD_NAME_PRE_RESCALE_PARALLELISM) Integer 
preRescaleParallelism,
+                @Nullable @JsonProperty(FIELD_NAME_POST_RESCALE_PARALLELISM)
+                        Integer postRescaleParallelism) {
+            this.jobVertexId = jobVertexId;
+            this.jobVertexName = jobVertexName;
+            this.slotSharingGroupId = slotSharingGroupId;
+            this.slotSharingGroupName = slotSharingGroupName;
+            this.desiredParallelism = desiredParallelism;
+            this.sufficientParallelism = sufficientParallelism;
+            this.preRescaleParallelism = preRescaleParallelism;
+            this.postRescaleParallelism = postRescaleParallelism;
+        }
+
+        @JsonIgnore
+        public VertexParallelismRescaleInfo(
+                JobVertexID jobVertexId, String jobVertexName, 
SlotSharingGroup slotSharingGroup) {
+            this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+            this.jobVertexName = jobVertexName;
+            this.slotSharingGroupName = 
slotSharingGroup.getSlotSharingGroupName();
+            this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
+        }
+
+        public JobVertexID getJobVertexId() {
+            return jobVertexId;
+        }
+
+        public String getJobVertexName() {
+            return jobVertexName;
+        }
+
+        public void setJobVertexName(String jobVertexName) {
+            this.jobVertexName = jobVertexName;
+        }
+
+        public SlotSharingGroupId getSlotSharingGroupId() {
+            return slotSharingGroupId;
+        }
+
+        public String getSlotSharingGroupName() {
+            return slotSharingGroupName;
+        }
+
+        @Nullable
+        public Integer getPreRescaleParallelism() {
+            return preRescaleParallelism;
+        }
+
+        public void setPreRescaleParallelism(@Nullable Integer 
preRescaleParallelism) {
+            this.preRescaleParallelism = preRescaleParallelism;
+        }
+
+        public Integer getDesiredParallelism() {
+            return desiredParallelism;
+        }
+
+        public Integer getSufficientParallelism() {
+            return sufficientParallelism;
+        }
+
+        public void setRequiredParallelisms(
+                VertexParallelismInformation vertexParallelismInformation) {
+            this.sufficientParallelism = 
vertexParallelismInformation.getMinParallelism();
+            this.desiredParallelism = 
vertexParallelismInformation.getParallelism();
+        }
+
+        @Nullable
+        public Integer getPostRescaleParallelism() {
+            return postRescaleParallelism;
+        }
+
+        public void setPostRescaleParallelism(Integer postRescaleParallelism) {
+            this.postRescaleParallelism = postRescaleParallelism;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            VertexParallelismRescaleInfo that = (VertexParallelismRescaleInfo) 
o;
+            return Objects.equals(jobVertexId, that.jobVertexId)
+                    && Objects.equals(jobVertexName, that.jobVertexName)
+                    && Objects.equals(slotSharingGroupId, 
that.slotSharingGroupId)
+                    && Objects.equals(slotSharingGroupName, 
that.slotSharingGroupName)
+                    && Objects.equals(preRescaleParallelism, 
that.preRescaleParallelism)
+                    && Objects.equals(desiredParallelism, 
that.desiredParallelism)
+                    && Objects.equals(sufficientParallelism, 
that.sufficientParallelism)
+                    && Objects.equals(postRescaleParallelism, 
that.postRescaleParallelism);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                    jobVertexId,
+                    jobVertexName,
+                    slotSharingGroupId,
+                    slotSharingGroupName,
+                    preRescaleParallelism,
+                    desiredParallelism,
+                    sufficientParallelism,
+                    postRescaleParallelism);
+        }
+
+        @Override
+        public String toString() {
+            return "VertexParallelismRescaleInfo{"
+                    + "jobVertexId="
+                    + jobVertexId
+                    + ", jobVertexName='"
+                    + jobVertexName
+                    + '\''
+                    + ", slotSharingGroupId="
+                    + slotSharingGroupId
+                    + ", slotSharingGroupName='"
+                    + slotSharingGroupName
+                    + '\''
+                    + ", desiredParallelism="
+                    + desiredParallelism
+                    + ", sufficientParallelism="
+                    + sufficientParallelism
+                    + ", preRescaleParallelism="
+                    + preRescaleParallelism
+                    + ", postRescaleParallelism="
+                    + postRescaleParallelism
+                    + '}';
+        }
+    }
+
+    public static final class SlotSharingGroupRescaleInfo implements 
Serializable {
+        private static final long serialVersionUID = 1L;
+        public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = 
"slotSharingGroupId";
+        public static final String FIELD_NAME_SLOT_SHARING_GROUP_NAME = 
"slotSharingGroupName";
+        public static final String FIELD_NAME_REQUEST_RESOURCE_PROFILE = 
"requestResourceProfile";
+        public static final String FIELD_NAME_DESIRED_SLOTS = "desiredSlots";
+        public static final String FIELD_NAME_MINIMAL_REQUIRED_SLOTS = 
"minimalRequiredSlots";
+        public static final String FIELD_NAME_PRE_RESCALE_SLOTS = 
"preRescaleSlots";
+        public static final String FIELD_NAME_POST_RESCALE_SLOTS = 
"postRescaleSlots";
+        public static final String FIELD_NAME_ACQUIRED_RESOURCE_PROFILE = 
"acquiredResourceProfile";
+
+        @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+        @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+        @JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
+        private final SlotSharingGroupId slotSharingGroupId;
+
+        @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME)
+        private final String slotSharingGroupName;
+
+        @JsonProperty(FIELD_NAME_REQUEST_RESOURCE_PROFILE)
+        private final ResourceProfileInfo requiredResourceProfileInfo;
+
+        @JsonProperty(FIELD_NAME_DESIRED_SLOTS)
+        private final Integer desiredSlots;
+
+        @JsonProperty(FIELD_NAME_MINIMAL_REQUIRED_SLOTS)
+        private final Integer minimalRequiredSlots;
+
+        @JsonProperty(FIELD_NAME_PRE_RESCALE_SLOTS)
+        private final Integer preRescaleSlots;
+
+        @JsonProperty(FIELD_NAME_POST_RESCALE_SLOTS)
+        private final Integer postRescaleSlots;
+
+        @JsonProperty(FIELD_NAME_ACQUIRED_RESOURCE_PROFILE)
+        private final ResourceProfileInfo acquiredResourceProfileInfo;
+
+        @JsonCreator
+        public SlotSharingGroupRescaleInfo(
+                @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+                        @JsonDeserialize(using = 
SlotSharingGroupIDDeserializer.class)
+                        @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+                        SlotSharingGroupId slotSharingGroupId,
+                @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME) String 
slotSharingGroupName,
+                @JsonProperty(FIELD_NAME_REQUEST_RESOURCE_PROFILE)
+                        ResourceProfileInfo requiredResourceProfileInfo,
+                @JsonProperty(FIELD_NAME_DESIRED_SLOTS) Integer desiredSlots,
+                @JsonProperty(FIELD_NAME_MINIMAL_REQUIRED_SLOTS) Integer 
minimalRequiredSlots,
+                @JsonProperty(FIELD_NAME_PRE_RESCALE_SLOTS) Integer 
preRescaleSlots,
+                @JsonProperty(FIELD_NAME_POST_RESCALE_SLOTS) Integer 
postRescaleSlots,
+                @JsonProperty(FIELD_NAME_ACQUIRED_RESOURCE_PROFILE)
+                        ResourceProfileInfo acquiredResourceProfileInfo) {
+            this.slotSharingGroupId = slotSharingGroupId;
+            this.slotSharingGroupName = slotSharingGroupName;
+            this.requiredResourceProfileInfo = requiredResourceProfileInfo;
+            this.desiredSlots = desiredSlots;
+            this.minimalRequiredSlots = minimalRequiredSlots;
+            this.preRescaleSlots = preRescaleSlots;
+            this.postRescaleSlots = postRescaleSlots;
+            this.acquiredResourceProfileInfo = acquiredResourceProfileInfo;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            SlotSharingGroupRescaleInfo that = (SlotSharingGroupRescaleInfo) o;
+            return Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
+                    && Objects.equals(slotSharingGroupName, 
that.slotSharingGroupName)
+                    && Objects.equals(requiredResourceProfileInfo, 
that.requiredResourceProfileInfo)
+                    && Objects.equals(desiredSlots, that.desiredSlots)
+                    && Objects.equals(minimalRequiredSlots, 
that.minimalRequiredSlots)
+                    && Objects.equals(preRescaleSlots, that.preRescaleSlots)
+                    && Objects.equals(postRescaleSlots, that.postRescaleSlots)
+                    && Objects.equals(
+                            acquiredResourceProfileInfo, 
that.acquiredResourceProfileInfo);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(
+                    slotSharingGroupId,
+                    slotSharingGroupName,
+                    requiredResourceProfileInfo,
+                    desiredSlots,
+                    minimalRequiredSlots,
+                    preRescaleSlots,
+                    postRescaleSlots,
+                    acquiredResourceProfileInfo);
+        }
+
+        public static SlotSharingGroupRescaleInfo fromSlotSharingGroupRescale(
+                SlotSharingGroupRescale slotSharingGroupRescale) {
+            return new SlotSharingGroupRescaleInfo(
+                    slotSharingGroupRescale.getSlotSharingGroupId(),
+                    slotSharingGroupRescale.getSlotSharingGroupName(),
+                    ResourceProfileInfo.fromResourceProfile(
+                            
slotSharingGroupRescale.getRequiredResourceProfile()),
+                    slotSharingGroupRescale.getDesiredSlots(),
+                    slotSharingGroupRescale.getMinimalRequiredSlots(),
+                    slotSharingGroupRescale.getPreRescaleSlots(),
+                    slotSharingGroupRescale.getPostRescaleSlots(),
+                    ResourceProfileInfo.fromResourceProfile(
+                            Optional.ofNullable(
+                                            
slotSharingGroupRescale.getAcquiredResourceProfile())
+                                    .orElse(ResourceProfile.UNKNOWN)));
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetailsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetailsHeaders.java
new file mode 100644
index 00000000000..a1fb71a4a31
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetailsHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for job rescale details. */
+public class JobRescaleDetailsHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, JobRescaleDetails, JobIDRescaleIDParameters> 
{
+
+    public static final JobRescaleDetailsHeaders INSTANCE = new 
JobRescaleDetailsHeaders();
+
+    public static final String JOB_RESCALES_PATH = 
"/jobs/:jobid/rescales/details/:rescaleuuid";
+
+    @Override
+    public Class<JobRescaleDetails> getResponseClass() {
+        return JobRescaleDetails.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Return a job rescale details.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public JobIDRescaleIDParameters getUnresolvedMessageParameters() {
+        return new JobIDRescaleIDParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOB_RESCALES_PATH;
+    }
+
+    public static JobRescaleDetailsHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleIDPathParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleIDPathParameter.java
new file mode 100644
index 00000000000..bf15d0291d8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleIDPathParameter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+
+/** The {@link MessagePathParameter} for {@link JobRescaleDetailsHeaders}. */
+public class JobRescaleIDPathParameter extends 
MessagePathParameter<RescaleIdInfo.RescaleUUID> {
+
+    public static final String KEY = "rescaleuuid";
+
+    protected JobRescaleIDPathParameter() {
+        super(KEY);
+    }
+
+    @Override
+    protected RescaleIdInfo.RescaleUUID convertFromString(String value) throws 
ConversionException {
+        try {
+            return new RescaleIdInfo.RescaleUUID(value);
+        } catch (NumberFormatException nfe) {
+            throw new ConversionException("Could not parse long from " + value 
+ '.', nfe);
+        }
+    }
+
+    @Override
+    protected String convertToString(RescaleIdInfo.RescaleUUID value) {
+        return value.toString();
+    }
+
+    @Override
+    public String getDescription() {
+        return "String value that identifies a job rescale.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/SchedulerStateSpan.java
similarity index 70%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/SchedulerStateSpan.java
index 1c7e1d02ec3..80b506f928e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/SchedulerStateSpan.java
@@ -16,10 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.scheduler.adaptive.timeline;
+package org.apache.flink.runtime.rest.messages.job.rescales;
 
 import org.apache.flink.util.Preconditions;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -33,22 +36,38 @@ import java.util.Objects;
 public class SchedulerStateSpan implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    public static final String FIELD_NAME_STATE = "state";
+    public static final String FIELD_NAME_ENTER_TIMESTAMP = 
"enterTimestampInMillis";
+    public static final String FIELD_NAME_LEAVE_TIMESTAMP = 
"leaveTimestampInMillis";
+    public static final String FIELD_NAME_DURATION = "durationInMillis";
+    public static final String FIELD_NAME_EXCEPTION = "stringifiedException";
+
+    @JsonProperty(FIELD_NAME_STATE)
     private final String state;
 
-    @Nullable private final Long enterTimestamp;
+    @Nullable
+    @JsonProperty(FIELD_NAME_ENTER_TIMESTAMP)
+    private final Long enterTimestamp;
 
-    @Nullable private final Long leaveTimestamp;
+    @Nullable
+    @JsonProperty(FIELD_NAME_LEAVE_TIMESTAMP)
+    private final Long leaveTimestamp;
 
-    @Nullable private final Long duration;
+    @Nullable
+    @JsonProperty(FIELD_NAME_DURATION)
+    private final Long duration;
 
-    @Nullable private String stringifiedException;
+    @Nullable
+    @JsonProperty(FIELD_NAME_EXCEPTION)
+    private String stringifiedException;
 
+    @JsonCreator
     public SchedulerStateSpan(
-            String state,
-            Long logicEnterMillis,
-            Long logicLeaveMillis,
-            Long duration,
-            String stringifiedException) {
+            @JsonProperty(FIELD_NAME_STATE) String state,
+            @JsonProperty(FIELD_NAME_ENTER_TIMESTAMP) Long logicEnterMillis,
+            @JsonProperty(FIELD_NAME_LEAVE_TIMESTAMP) Long logicLeaveMillis,
+            @JsonProperty(FIELD_NAME_DURATION) Long duration,
+            @JsonProperty(FIELD_NAME_EXCEPTION) String stringifiedException) {
         this.state = Preconditions.checkNotNull(state);
         this.enterTimestamp = logicEnterMillis;
         this.leaveTimestamp = logicLeaveMillis;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeyDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeyDeserializer.java
new file mode 100644
index 00000000000..f7371eebda9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeyDeserializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/** Jackson deserializer for {@link SlotSharingGroupId}. */
+public class SlotSharingGroupIDKeyDeserializer extends KeyDeserializer {
+
+    @Override
+    public Object deserializeKey(String key, DeserializationContext ctxt) 
throws IOException {
+        return SlotSharingGroupId.fromHexString(key);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeySerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeySerializer.java
new file mode 100644
index 00000000000..ec7660d6db1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeySerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/** Jackson serializer for {@link SlotSharingGroupId}. */
+public class SlotSharingGroupIDKeySerializer extends 
StdSerializer<SlotSharingGroupId> {
+
+    private static final long serialVersionUID = 2970050507628933522L;
+
+    public SlotSharingGroupIDKeySerializer() {
+        super(SlotSharingGroupId.class);
+    }
+
+    @Override
+    public void serialize(SlotSharingGroupId value, JsonGenerator gen, 
SerializerProvider provider)
+            throws IOException {
+        gen.writeFieldName(value.toString());
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
index e37852db6d0..59d91ca5f4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive.timeline;
 
 import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
 import org.apache.flink.runtime.util.BoundedFIFOQueue;
-import org.apache.flink.util.AbstractID;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +54,7 @@ public class DefaultRescaleTimeline implements 
RescaleTimeline {
     public DefaultRescaleTimeline(
             Supplier<JobInformation> jobInformationGetter, int maxHistorySize) 
{
         this.jobInformationGetter = jobInformationGetter;
-        this.rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 0L);
+        this.rescaleIdInfo = new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 0L);
         this.latestRescales = new 
ConcurrentHashMap<>(TerminalState.values().length);
         this.rescaleHistory = new BoundedFIFOQueue<>(maxHistorySize);
         this.rescalesSummary = new RescalesSummary(maxHistorySize);
@@ -133,7 +132,7 @@ public class DefaultRescaleTimeline implements 
RescaleTimeline {
 
     private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
         if (newRescaleEpoch) {
-            rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L);
+            rescaleIdInfo = new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L);
         } else {
             rescaleIdInfo =
                     new RescaleIdInfo(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
index 90e0412fd0d..2d9b6381a9a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
 import org.apache.flink.runtime.scheduler.adaptive.State;
@@ -122,7 +124,7 @@ public class Rescale implements Serializable {
 
     private final RescaleIdInfo rescaleIdInfo;
 
-    private final Map<JobVertexID, VertexParallelismRescale> vertices;
+    private final Map<JobVertexID, VertexParallelismRescaleInfo> vertices;
     private final Map<SlotSharingGroupId, SlotSharingGroupRescale> slots;
 
     private final List<SchedulerStateSpan> schedulerStates;
@@ -134,7 +136,7 @@ public class Rescale implements Serializable {
     @Nullable private TerminalState terminalState;
     @Nullable private TerminatedReason terminatedReason;
 
-    Rescale(RescaleIdInfo rescaleIdInfo) {
+    public Rescale(RescaleIdInfo rescaleIdInfo) {
         this.rescaleIdInfo = rescaleIdInfo;
         this.vertices = new HashMap<>();
         this.slots = new HashMap<>();
@@ -142,7 +144,7 @@ public class Rescale implements Serializable {
     }
 
     @VisibleForTesting
-    Rescale addSchedulerState(SchedulerStateSpan schedulerStateSpan) {
+    public Rescale addSchedulerState(SchedulerStateSpan schedulerStateSpan) {
         if (this.isTerminated()) {
             LOG.warn(
                     "Rescale is already terminated. The scheduler state {} 
will be ignored.",
@@ -293,11 +295,11 @@ public class Rescale implements Serializable {
             SlotSharingGroup slotSharingGroup =
                     
jobInformation.getVertexInformation(jvId).getSlotSharingGroup();
             String vertexName = 
jobInformation.getVertexInformation(jvId).getVertexName();
-            VertexParallelismRescale vertexParallelismRescale =
+            VertexParallelismRescaleInfo vertexParallelismRescale =
                     this.vertices.computeIfAbsent(
                             jvId,
                             jobVertexID ->
-                                    new VertexParallelismRescale(
+                                    new VertexParallelismRescaleInfo(
                                             jvId, vertexName, 
slotSharingGroup));
             vertexParallelismRescale.setRequiredParallelisms(entry.getValue());
         }
@@ -330,11 +332,11 @@ public class Rescale implements Serializable {
                     
lastCompletedRescale.vertices.get(jobVertexID).getPostRescaleParallelism();
             JobInformation.VertexInformation vertexInformation =
                     jobInformation.getVertexInformation(jobVertexID);
-            VertexParallelismRescale vertexParallelismRescale =
+            VertexParallelismRescaleInfo vertexParallelismRescale =
                     vertices.computeIfAbsent(
                             jobVertexID,
                             jobVertexId ->
-                                    new VertexParallelismRescale(
+                                    new VertexParallelismRescaleInfo(
                                             jobVertexId,
                                             vertexInformation.getVertexName(),
                                             
vertexInformation.getSlotSharingGroup()));
@@ -363,11 +365,11 @@ public class Rescale implements Serializable {
         for (JobVertexID vertexID : vertices) {
             JobInformation.VertexInformation vertexInformation =
                     jobInformation.getVertexInformation(vertexID);
-            VertexParallelismRescale vertexParallelismRescale =
+            VertexParallelismRescaleInfo vertexParallelismRescale =
                     this.vertices.computeIfAbsent(
                             vertexID,
                             jobVertexId ->
-                                    new VertexParallelismRescale(
+                                    new VertexParallelismRescaleInfo(
                                             jobVertexId,
                                             vertexInformation.getVertexName(),
                                             
vertexInformation.getSlotSharingGroup()));
@@ -421,7 +423,7 @@ public class Rescale implements Serializable {
         LOG.info("Updated rescale is: {}", this);
     }
 
-    public Map<JobVertexID, VertexParallelismRescale> getVertices() {
+    public Map<JobVertexID, VertexParallelismRescaleInfo> getVertices() {
         return Collections.unmodifiableMap(vertices);
     }
 
@@ -466,12 +468,12 @@ public class Rescale implements Serializable {
     }
 
     @VisibleForTesting
-    Map<JobVertexID, VertexParallelismRescale> getModifiableVertices() {
+    public Map<JobVertexID, VertexParallelismRescaleInfo> 
getModifiableVertices() {
         return vertices;
     }
 
     @VisibleForTesting
-    Map<SlotSharingGroupId, SlotSharingGroupRescale> getModifiableSlots() {
+    public Map<SlotSharingGroupId, SlotSharingGroupRescale> 
getModifiableSlots() {
         return slots;
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
index 94d36cc1393..1751ac9f497 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.adaptive.timeline;
 
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
 
 import java.io.Serializable;
 import java.util.Objects;
@@ -26,23 +27,43 @@ import java.util.Objects;
 /** The class to represent the rescale id description in one resource 
requirements rescale. */
 public class RescaleIdInfo implements Serializable {
 
+    /** Class for unique rescale ID. */
+    public static class RescaleUUID extends AbstractID {
+        private static final long serialVersionUID = 1L;
+
+        public RescaleUUID() {}
+
+        public RescaleUUID(byte[] bytes) {
+            super(bytes);
+        }
+
+        public RescaleUUID(String hexString) {
+            this(StringUtils.hexStringToByte(hexString));
+        }
+    }
+
+    /** Class for unique resource requirements ID. */
+    public static class ResourceRequirementsID extends AbstractID {
+        private static final long serialVersionUID = 1L;
+    }
+
     private static final long serialVersionUID = 1L;
 
-    private final AbstractID rescaleUuid;
-    private final AbstractID resourceRequirementsId;
+    private final RescaleUUID rescaleUuid;
+    private final ResourceRequirementsID resourceRequirementsId;
     private final long rescaleAttemptId;
 
-    public RescaleIdInfo(AbstractID resourceRequirementsId, Long 
rescaleAttemptId) {
+    public RescaleIdInfo(ResourceRequirementsID resourceRequirementsId, Long 
rescaleAttemptId) {
         this.resourceRequirementsId = resourceRequirementsId;
         this.rescaleAttemptId = rescaleAttemptId;
-        this.rescaleUuid = new AbstractID();
+        this.rescaleUuid = new RescaleUUID();
     }
 
-    public AbstractID getRescaleUuid() {
+    public RescaleUUID getRescaleUuid() {
         return rescaleUuid;
     }
 
-    public AbstractID getResourceRequirementsId() {
+    public ResourceRequirementsID getResourceRequirementsId() {
         return resourceRequirementsId;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
index 316d34ef634..f8697fe473f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
@@ -19,20 +19,33 @@
 package org.apache.flink.runtime.scheduler.adaptive.timeline;
 
 import org.apache.flink.runtime.util.stats.StatsSummarySnapshot;
+import org.apache.flink.util.AbstractID;
+
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class RescalesStatsSnapshot implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final List<Rescale> rescaleHistory;
+    private final Map<AbstractID, Rescale> idToRescaleMap;
     private final RescalesSummarySnapshot rescalesSummarySnapshot;
 
     public RescalesStatsSnapshot(
             List<Rescale> rescaleHistory, RescalesSummarySnapshot 
rescalesSummarySnapshot) {
         this.rescaleHistory = rescaleHistory;
+        this.idToRescaleMap =
+                rescaleHistory.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        r -> 
r.getRescaleIdInfo().getRescaleUuid(),
+                                        Function.identity()));
         this.rescalesSummarySnapshot = rescalesSummarySnapshot;
     }
 
@@ -40,6 +53,10 @@ public class RescalesStatsSnapshot implements Serializable {
         return rescaleHistory;
     }
 
+    public @Nullable Rescale getRescale(AbstractID rescaleId) {
+        return idToRescaleMap.get(rescaleId);
+    }
+
     public RescalesSummarySnapshot getRescalesSummarySnapshot() {
         return rescalesSummarySnapshot;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
deleted file mode 100644
index 3ea9476e79a..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive.timeline;
-
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/** The rescale information of a {@link 
org.apache.flink.runtime.jobgraph.JobVertex}. */
-public class VertexParallelismRescale implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private final JobVertexID jobVertexId;
-    private String jobVertexName;
-    private SlotSharingGroupId slotSharingGroupId;
-    private String slotSharingGroupName;
-    private Integer desiredParallelism;
-    private Integer sufficientParallelism;
-
-    @Nullable private Integer preRescaleParallelism;
-
-    @Nullable private Integer postRescaleParallelism;
-
-    public VertexParallelismRescale(
-            JobVertexID jobVertexId, String jobVertexName, SlotSharingGroup 
slotSharingGroup) {
-        this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
-        this.jobVertexName = jobVertexName;
-        this.slotSharingGroupName = slotSharingGroup.getSlotSharingGroupName();
-        this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
-    }
-
-    public JobVertexID getJobVertexId() {
-        return jobVertexId;
-    }
-
-    public String getJobVertexName() {
-        return jobVertexName;
-    }
-
-    public void setJobVertexName(String jobVertexName) {
-        this.jobVertexName = jobVertexName;
-    }
-
-    public SlotSharingGroupId getSlotSharingGroupId() {
-        return slotSharingGroupId;
-    }
-
-    public String getSlotSharingGroupName() {
-        return slotSharingGroupName;
-    }
-
-    @Nullable
-    public Integer getPreRescaleParallelism() {
-        return preRescaleParallelism;
-    }
-
-    public void setPreRescaleParallelism(@Nullable Integer 
preRescaleParallelism) {
-        this.preRescaleParallelism = preRescaleParallelism;
-    }
-
-    public Integer getDesiredParallelism() {
-        return desiredParallelism;
-    }
-
-    public Integer getSufficientParallelism() {
-        return sufficientParallelism;
-    }
-
-    public void setRequiredParallelisms(VertexParallelismInformation 
vertexParallelismInformation) {
-        this.sufficientParallelism = 
vertexParallelismInformation.getMinParallelism();
-        this.desiredParallelism = 
vertexParallelismInformation.getParallelism();
-    }
-
-    @Nullable
-    public Integer getPostRescaleParallelism() {
-        return postRescaleParallelism;
-    }
-
-    public void setPostRescaleParallelism(Integer postRescaleParallelism) {
-        this.postRescaleParallelism = postRescaleParallelism;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        VertexParallelismRescale that = (VertexParallelismRescale) o;
-        return Objects.equals(jobVertexId, that.jobVertexId)
-                && Objects.equals(jobVertexName, that.jobVertexName)
-                && Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
-                && Objects.equals(slotSharingGroupName, 
that.slotSharingGroupName)
-                && Objects.equals(preRescaleParallelism, 
that.preRescaleParallelism)
-                && Objects.equals(desiredParallelism, that.desiredParallelism)
-                && Objects.equals(sufficientParallelism, 
that.sufficientParallelism)
-                && Objects.equals(postRescaleParallelism, 
that.postRescaleParallelism);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(
-                jobVertexId,
-                jobVertexName,
-                slotSharingGroupId,
-                slotSharingGroupName,
-                preRescaleParallelism,
-                desiredParallelism,
-                sufficientParallelism,
-                postRescaleParallelism);
-    }
-
-    @Override
-    public String toString() {
-        return "VertexParallelismRescale{"
-                + "jobVertexId="
-                + jobVertexId
-                + ", jobVertexName='"
-                + jobVertexName
-                + '\''
-                + ", slotSharingGroupId="
-                + slotSharingGroupId
-                + ", slotSharingGroupName='"
-                + slotSharingGroupName
-                + '\''
-                + ", desiredParallelism="
-                + desiredParallelism
-                + ", sufficientParallelism="
-                + sufficientParallelism
-                + ", preRescaleParallelism="
-                + preRescaleParallelism
-                + ", postRescaleParallelism="
-                + postRescaleParallelism
-                + '}';
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 211b8749425..8bf45671feb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -97,6 +97,7 @@ import 
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHand
 import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
 import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
 import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -164,6 +165,7 @@ import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumul
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1212,6 +1214,17 @@ public class WebMonitorEndpoint<T extends 
RestfulGateway> extends RestServerEndp
         handlers.add(
                 Tuple2.of(jobRescaleConfigHandler.getMessageHeaders(), 
jobRescaleConfigHandler));
 
+        final JobRescaleDetailsHandler jobRescaleDetailsHandler =
+                new JobRescaleDetailsHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobRescaleDetailsHeaders.getInstance(),
+                        executionGraphCache,
+                        executor);
+        handlers.add(
+                Tuple2.of(jobRescaleDetailsHandler.getMessageHeaders(), 
jobRescaleDetailsHandler));
+
         handlers.stream()
                 .map(tuple -> tuple.f1)
                 .filter(handler -> handler instanceof JsonArchivist)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
new file mode 100644
index 00000000000..3ecd4e46038
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobIDRescaleIDParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupRescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.fromRescale;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescaleDetailsHandler}. */
+class JobRescaleDetailsHandlerTest {
+
+    private final JobRescaleDetailsHandler testInstance =
+            new JobRescaleDetailsHandler(
+                    CompletableFuture::new,
+                    TestingUtils.TIMEOUT,
+                    Map.of(),
+                    JobRescaleDetailsHeaders.getInstance(),
+                    new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, 
TestingUtils.TIMEOUT),
+                    Executors.directExecutor());
+
+    @Test
+    void testUnNormalCases() throws HandlerRequestException, 
RestHandlerException {
+        // Test for adaptive scheduler rescales was not enabled for job.
+        final ExecutionGraphInfo 
executionGraphInfoWithNullRescalesStatsSnapshot =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(), 
List.of(), null);
+        final HandlerRequest<EmptyRequestBody> request =
+                createRequest(
+                        
executionGraphInfoWithNullRescalesStatsSnapshot.getJobId(),
+                        new RescaleIdInfo.RescaleUUID());
+        assertThatThrownBy(
+                        () ->
+                                testInstance.handleRequest(
+                                        request, 
executionGraphInfoWithNullRescalesStatsSnapshot))
+                .isInstanceOf(RestHandlerException.class);
+
+        // Test for that case could not find rescale details for the specified 
rescale uuid.
+        final ExecutionGraphInfo 
executionGraphInfoWithEmptyRescalesStatsSnapshot =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(),
+                        List.of(),
+                        JobManagerOptions.SchedulerType.Adaptive,
+                        null,
+                        RescalesStatsSnapshot.emptySnapshot());
+        assertThatThrownBy(
+                        () ->
+                                testInstance.handleRequest(
+                                        request, 
executionGraphInfoWithEmptyRescalesStatsSnapshot))
+                .isInstanceOf(RestHandlerException.class);
+    }
+
+    @Test
+    void testRequestNormalJobRescaleStatisticsDetails()
+            throws HandlerRequestException, RestHandlerException {
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L))
+                        .setStartTimestamp(1L)
+                        .setEndTimestamp(100L)
+                        .setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
+                        .setStringifiedException("mocked exception")
+                        .addSchedulerState(new SchedulerStateSpan("Created", 
1L, 5L, 4L, null))
+                        .setTerminatedReason(TerminatedReason.SUCCEEDED);
+
+        JobVertexID jobVertexID = new JobVertexID();
+        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+
+        SlotSharingGroupRescale slotSharingGroupRescale =
+                new SlotSharingGroupRescale(slotSharingGroup);
+        slotSharingGroupRescale.setPostRescaleSlots(2);
+        slotSharingGroupRescale.setPreRescaleSlots(1);
+        slotSharingGroupRescale.setDesiredSlots(5);
+        slotSharingGroupRescale.setMinimalRequiredSlots(1);
+        
slotSharingGroupRescale.setAcquiredResourceProfile(ResourceProfile.ZERO);
+
+        VertexParallelismRescaleInfo vertexParallelismRescaleInfo =
+                new VertexParallelismRescaleInfo(
+                        jobVertexID,
+                        "jvName",
+                        slotSharingGroup.getSlotSharingGroupId(),
+                        "default",
+                        5,
+                        1,
+                        1,
+                        2);
+
+        rescale.getModifiableSlots()
+                .put(slotSharingGroup.getSlotSharingGroupId(), 
slotSharingGroupRescale);
+        rescale.getModifiableVertices().put(jobVertexID, 
vertexParallelismRescaleInfo);
+
+        RescalesSummary rescalesSummary = new RescalesSummary(2);
+        rescalesSummary.addTerminated(rescale);
+
+        final ExecutionGraphInfo executionGraphInfo =
+                new ExecutionGraphInfo(
+                        new ArchivedExecutionGraphBuilder().build(),
+                        List.of(),
+                        null,
+                        null,
+                        new RescalesStatsSnapshot(
+                                List.of(rescale), 
rescalesSummary.createSnapshot()));
+        final HandlerRequest<EmptyRequestBody> request =
+                createRequest(
+                        executionGraphInfo.getJobId(), 
rescale.getRescaleIdInfo().getRescaleUuid());
+        JobRescaleDetails jobRescaleDetails =
+                testInstance.handleRequest(request, executionGraphInfo);
+        assertThat(jobRescaleDetails).isEqualTo(fromRescale(rescale, true));
+    }
+
+    private static HandlerRequest<EmptyRequestBody> createRequest(
+            JobID jobId, AbstractID rescaleUuid) throws 
HandlerRequestException {
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+        pathParameters.put(JobRescaleIDPathParameter.KEY, 
rescaleUuid.toString());
+
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new JobIDRescaleIDParameters(),
+                pathParameters,
+                new HashMap<>(),
+                List.of());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
index 3d0775860b5..4664811cb57 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
 import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
 import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
@@ -32,7 +34,6 @@ import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
-import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -160,7 +161,8 @@ class RescaleTest {
 
     @Test
     void testAddSchedulerState() {
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
 
         // Test for add a state span into a terminated rescale.
         rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
@@ -168,21 +170,21 @@ class RescaleTest {
         assertThat(rescale.getSchedulerStates()).isEmpty();
 
         // Test for add a state span into a non-terminated rescale.
-        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale = new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null, 
null));
         assertThat(rescale.getSchedulerStates()).hasSize(1);
 
         // Test the correctness of throwable processing.
         String stringifiedException1 =
                 ExceptionUtils.stringifyException(new 
RuntimeException("exception1"));
-        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale = new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setStringifiedException(stringifiedException1);
         rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null, 
null));
         SchedulerStateSpan schedulerStateSpan = 
rescale.getSchedulerStates().get(0);
         
assertThat(schedulerStateSpan.getStringifiedException()).isEqualTo(stringifiedException1);
         assertThat(rescale.getStringifiedException()).isNull();
 
-        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale = new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.addSchedulerState(
                 new SchedulerStateSpan("", null, null, null, 
stringifiedException1));
         schedulerStateSpan = rescale.getSchedulerStates().get(0);
@@ -190,7 +192,7 @@ class RescaleTest {
 
         // Test the correctness of the end time of span auto-fulfill.
         State stateWithoutEndTimestamp = new TestingAdaptiveSchedulerState(2L, 
null);
-        rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        rescale = new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setStringifiedException(stringifiedException1);
         rescale.addSchedulerState(stateWithoutEndTimestamp, null);
         schedulerStateSpan = rescale.getSchedulerStates().get(0);
@@ -200,7 +202,8 @@ class RescaleTest {
 
     @Test
     void testSetDesiredSlots() {
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setDesiredSlots(jobInformation);
         Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = 
rescale.getSlots();
         assertThat(slots).hasSize(2);
@@ -223,28 +226,29 @@ class RescaleTest {
 
     @Test
     void testSetDesiredVertexParallelism() {
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setDesiredVertexParallelism(jobInformation);
-        Map<JobVertexID, VertexParallelismRescale> vertices = 
rescale.getVertices();
+        Map<JobVertexID, VertexParallelismRescaleInfo> vertices = 
rescale.getVertices();
         assertThat(vertices).hasSize(4);
         assertThat(vertices.keySet())
                 .isEqualTo(
                         vertices.values().stream()
-                                .map(VertexParallelismRescale::getJobVertexId)
+                                
.map(VertexParallelismRescaleInfo::getJobVertexId)
                                 .collect(Collectors.toSet()))
                 .hasSameElementsAs(
                         
jobVertices.stream().map(JobVertex::getID).collect(Collectors.toSet()));
 
         assertThat(
                         vertices.values().stream()
-                                
.map(VertexParallelismRescale::getJobVertexName)
+                                
.map(VertexParallelismRescaleInfo::getJobVertexName)
                                 .collect(Collectors.toSet()))
                 .hasSameElementsAs(
                         
jobVertices.stream().map(JobVertex::getName).collect(Collectors.toSet()));
 
         assertThat(
                         vertices.values().stream()
-                                
.map(VertexParallelismRescale::getSlotSharingGroupId)
+                                
.map(VertexParallelismRescaleInfo::getSlotSharingGroupId)
                                 .collect(Collectors.toSet()))
                 .hasSameElementsAs(
                         jobVertices.stream()
@@ -253,7 +257,7 @@ class RescaleTest {
 
         assertThat(
                         vertices.values().stream()
-                                
.map(VertexParallelismRescale::getSlotSharingGroupName)
+                                
.map(VertexParallelismRescaleInfo::getSlotSharingGroupName)
                                 .collect(Collectors.toSet()))
                 .hasSameElementsAs(
                         jobVertices.stream()
@@ -262,19 +266,20 @@ class RescaleTest {
 
         assertThat(
                         vertices.values().stream()
-                                
.map(VertexParallelismRescale::getDesiredParallelism)
+                                
.map(VertexParallelismRescaleInfo::getDesiredParallelism)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(2, 3, 4, 5);
         assertThat(
                         vertices.values().stream()
-                                
.map(VertexParallelismRescale::getSufficientParallelism)
+                                
.map(VertexParallelismRescaleInfo::getSufficientParallelism)
                                 .collect(Collectors.toList()))
                 .containsExactlyInAnyOrder(1, 2, 3, 4);
     }
 
     @Test
     void testSetMinimalRequiredSlots() {
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setMinimalRequiredSlots(jobInformation);
         Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = 
rescale.getSlots();
         assertThat(slots).hasSize(2);
@@ -292,23 +297,25 @@ class RescaleTest {
     @Test
     void testSetPreRescaleSlotsAndParallelisms() {
         // Test for null last rescale.
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 2L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 2L));
         rescale.setPreRescaleSlotsAndParallelisms(jobInformation, null);
         assertThat(rescale.getSlots()).isEmpty();
         assertThat(rescale.getVertices()).isEmpty();
 
         // Test for non-null last rescale.
         // Prepare the last completed rescale.
-        Rescale lastCompletedRescale = new Rescale(new RescaleIdInfo(new 
AbstractID(), 1L));
-        Map<JobVertexID, VertexParallelismRescale> lastRescaleVertices =
+        Rescale lastCompletedRescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
+        Map<JobVertexID, VertexParallelismRescaleInfo> lastRescaleVertices =
                 lastCompletedRescale.getModifiableVertices();
         jobVertices.forEach(
                 jobVertex -> {
-                    VertexParallelismRescale vertexParallelismRescale =
+                    VertexParallelismRescaleInfo vertexParallelismRescale =
                             lastRescaleVertices.computeIfAbsent(
                                     jobVertex.getID(),
                                     ignored ->
-                                            new VertexParallelismRescale(
+                                            new VertexParallelismRescaleInfo(
                                                     jobVertex.getID(),
                                                     jobInformation
                                                             
.getVertexInformation(jobVertex.getID())
@@ -340,7 +347,7 @@ class RescaleTest {
 
         assertThat(
                         rescale.getVertices().values().stream()
-                                
.map(VertexParallelismRescale::getPreRescaleParallelism)
+                                
.map(VertexParallelismRescaleInfo::getPreRescaleParallelism)
                                 .collect(Collectors.toSet()))
                 .containsExactly(4);
     }
@@ -353,11 +360,12 @@ class RescaleTest {
                 vertex -> {
                     parallelismForVertices.put(vertex.getID(), 2);
                 });
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setPostRescaleVertexParallelism(jobInformation, 
postVertexParallelism);
         assertThat(
                         rescale.getVertices().values().stream()
-                                
.map(VertexParallelismRescale::getPostRescaleParallelism)
+                                
.map(VertexParallelismRescaleInfo::getPostRescaleParallelism)
                                 .collect(Collectors.toSet()))
                 .containsExactly(2);
     }
@@ -398,7 +406,8 @@ class RescaleTest {
                         new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
                                 slotSharingGroupB, null)));
 
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setPostRescaleSlots(slotAssignments);
         assertThat(
                         rescale.getSlots().values().stream()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
index 09d31c00d76..fc105d800dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.minicluster.MiniCluster;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -454,10 +456,10 @@ class RescaleTimelineITCase {
                                                 
assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
                                                         
slotSharingGroupRescale);
                                             });
-                    Map<JobVertexID, VertexParallelismRescale> vertices = 
rescale.getVertices();
+                    Map<JobVertexID, VertexParallelismRescaleInfo> vertices = 
rescale.getVertices();
                     assertThat(vertices.values())
                             .allSatisfy(
-                                    (Consumer<VertexParallelismRescale>)
+                                    (Consumer<VertexParallelismRescaleInfo>)
                                             vpr -> {
                                                 
assertThat(vpr.getPreRescaleParallelism()).isNull();
                                                 
assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
@@ -506,10 +508,10 @@ class RescaleTimelineITCase {
                                                 
assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
                                                         
slotSharingGroupRescale);
                                             });
-                    Map<JobVertexID, VertexParallelismRescale> vertices = 
rescale.getVertices();
+                    Map<JobVertexID, VertexParallelismRescaleInfo> vertices = 
rescale.getVertices();
                     assertThat(vertices.values())
                             .allSatisfy(
-                                    (Consumer<VertexParallelismRescale>)
+                                    (Consumer<VertexParallelismRescaleInfo>)
                                             vpr -> {
                                                 
assertThat(vpr.getPreRescaleParallelism())
                                                         .isNotNull();
@@ -634,7 +636,7 @@ class RescaleTimelineITCase {
     }
 
     private void assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
-            VertexParallelismRescale vpr) {
+            VertexParallelismRescaleInfo vpr) {
         assertThat(vpr.getDesiredParallelism()).isNotNull();
         assertThat(vpr.getPostRescaleParallelism()).isNotNull();
         assertThat(vpr.getSlotSharingGroupName()).isNotNull();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
index d773f0ac165..6be786d2ee2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.scheduler.adaptive.timeline;
 
 import org.apache.flink.runtime.util.stats.StatsSummary;
-import org.apache.flink.util.AbstractID;
 
 import org.junit.jupiter.api.Test;
 
@@ -29,7 +28,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 class RescalesSummaryTest {
 
     private Rescale getRescale() {
-        Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+        Rescale rescale =
+                new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L));
         rescale.setStartTimestamp(1L);
         rescale.setEndTimestamp(2L);
         return rescale;

Reply via email to