This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 301eb359c72f89382313e1cdb1f5d1c10a1c7834
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu Apr 9 21:54:11 2026 +0800

    [FLINK-38894][runtime/rest] Introduce the /jobs/:jobid/rescales/history 
endpoint in the REST API
    
    Co-authored-by: XComp <[email protected]>
    Co-authored-by: WeiZhong94 <[email protected]>
    Co-authored-by: mateczagany <[email protected]>
    Co-authored-by: ferenc-csaky <[email protected]>
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 192 ++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  24 +
 .../src/test/resources/rest_api_v1.snapshot        | 686 +++++++++++++--------
 .../job/rescales/JobRescalesHistoryHandler.java    | 115 ++++
 .../messages/job/rescales/JobRescaleDetails.java   |  15 +-
 .../messages/job/rescales/JobRescalesHistory.java  |  67 ++
 .../job/rescales/JobRescalesHistoryHeaders.java    |  72 +++
 .../adaptive/timeline/DefaultRescaleTimeline.java  |   5 +-
 .../adaptive/timeline/RescalesStatsSnapshot.java   |  15 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |  14 +
 .../job/rescales/JobRescaleDetailsHandlerTest.java |   7 +-
 ...est.java => JobRescalesHistoryHandlerTest.java} |  81 +--
 12 files changed, 967 insertions(+), 326 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 10fba048344..b0e0cf4d0a4 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3833,6 +3833,198 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/rescales/history</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Return job rescales history.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "array",
+  "items" : {
+    "type" : "object",
+    "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+    "properties" : {
+      "endTimestampInMillis" : {
+        "type" : "integer"
+      },
+      "rescaleAttemptId" : {
+        "type" : "integer"
+      },
+      "rescaleUuid" : {
+        "type" : "string"
+      },
+      "resourceRequirementsUuid" : {
+        "type" : "string"
+      },
+      "schedulerStates" : {
+        "type" : "array",
+        "items" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+          "properties" : {
+            "durationInMillis" : {
+              "type" : "integer"
+            },
+            "enterTimestampInMillis" : {
+              "type" : "integer"
+            },
+            "leaveTimestampInMillis" : {
+              "type" : "integer"
+            },
+            "state" : {
+              "type" : "string"
+            },
+            "stringifiedException" : {
+              "type" : "string"
+            }
+          }
+        }
+      },
+      "slots" : {
+        "type" : "object",
+        "additionalProperties" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+          "properties" : {
+            "acquiredResourceProfile" : {
+              "type" : "object",
+              "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+            },
+            "desiredSlots" : {
+              "type" : "integer"
+            },
+            "minimalRequiredSlots" : {
+              "type" : "integer"
+            },
+            "postRescaleSlots" : {
+              "type" : "integer"
+            },
+            "preRescaleSlots" : {
+              "type" : "integer"
+            },
+            "requestResourceProfile" : {
+              "type" : "object",
+              "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+              "properties" : {
+                "cpuCores" : {
+                  "type" : "number"
+                },
+                "extendedResources" : {
+                  "type" : "object",
+                  "additionalProperties" : {
+                    "type" : "number"
+                  }
+                },
+                "managedMemory" : {
+                  "type" : "integer"
+                },
+                "networkMemory" : {
+                  "type" : "integer"
+                },
+                "taskHeapMemory" : {
+                  "type" : "integer"
+                },
+                "taskOffHeapMemory" : {
+                  "type" : "integer"
+                }
+              }
+            },
+            "slotSharingGroupId" : {
+              "type" : "any"
+            },
+            "slotSharingGroupName" : {
+              "type" : "string"
+            }
+          }
+        }
+      },
+      "startTimestampInMillis" : {
+        "type" : "integer"
+      },
+      "terminalState" : {
+        "type" : "string",
+        "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+      },
+      "terminatedReason" : {
+        "type" : "string",
+        "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", 
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", 
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+      },
+      "triggerCause" : {
+        "type" : "string",
+        "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", 
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+      },
+      "vertices" : {
+        "type" : "object",
+        "additionalProperties" : {
+          "type" : "object",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+          "properties" : {
+            "desiredParallelism" : {
+              "type" : "integer"
+            },
+            "jobVertexId" : {
+              "type" : "any"
+            },
+            "jobVertexName" : {
+              "type" : "string"
+            },
+            "postRescaleParallelism" : {
+              "type" : "integer"
+            },
+            "preRescaleParallelism" : {
+              "type" : "integer"
+            },
+            "slotSharingGroupId" : {
+              "type" : "any"
+            },
+            "slotSharingGroupName" : {
+              "type" : "string"
+            },
+            "sufficientParallelism" : {
+              "type" : "integer"
+            }
+          }
+        }
+      }
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 43fa4b3fe28..80fed9473fc 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -958,6 +958,26 @@ paths:
             application/json:
               schema:
                 $ref: "#/components/schemas/JobRescaleDetails"
+  /jobs/{jobid}/rescales/history:
+    get:
+      description: Return job rescales history.
+      operationId: getJobRescalesHistory
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: "#/components/schemas/JobID"
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: "#/components/schemas/JobRescaleDetails"
   /jobs/{jobid}/rescaling:
     patch:
       description: Triggers the rescaling of a job. This async operation would 
return
@@ -2826,6 +2846,10 @@ components:
           type: object
           additionalProperties:
             $ref: "#/components/schemas/VertexParallelismRescaleInfo"
+    JobRescalesHistory:
+      type: array
+      items:
+        $ref: "#/components/schemas/JobRescaleDetails"
     JobResourceRequirementsBody:
       type: object
       additionalProperties:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index b4a16c9cd74..5a3d9bc9251 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -169,38 +169,6 @@
       "type" : "object",
       "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
     }
-  }, {
-    "url" : "/applications/:applicationid/jobmanager/config",
-    "method" : "GET",
-    "status-code" : "200 OK",
-    "file-upload" : false,
-    "path-parameters" : {
-      "pathParameters" : [ {
-        "key" : "applicationid"
-      } ]
-    },
-    "query-parameters" : {
-      "queryParameters" : [ ]
-    },
-    "request" : {
-      "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
-    },
-    "response" : {
-      "type" : "array",
-      "items" : {
-        "type" : "object",
-        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
-        "properties" : {
-          "key" : {
-            "type" : "string"
-          },
-          "value" : {
-            "type" : "string"
-          }
-        }
-      }
-    }
   }, {
     "url" : "/applications/:applicationid/exceptions",
     "method" : "GET",
@@ -251,6 +219,38 @@
         }
       }
     }
+  }, {
+    "url" : "/applications/:applicationid/jobmanager/config",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "applicationid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+        "properties" : {
+          "key" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/cluster",
     "method" : "DELETE",
@@ -2834,6 +2834,392 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/rescales/config",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
+      "properties" : {
+        "rescaleHistoryMax" : {
+          "type" : "integer"
+        },
+        "schedulerExecutionMode" : {
+          "type" : "string",
+          "enum" : [ "REACTIVE" ]
+        },
+        "submissionResourceWaitTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "submissionResourceStabilizationTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "slotIdleTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "executingCooldownTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "executingResourceStabilizationTimeoutInMillis" : {
+          "type" : "integer"
+        },
+        "maximumDelayForTriggeringRescaleInMillis" : {
+          "type" : "integer"
+        },
+        "rescaleOnFailedCheckpointCount" : {
+          "type" : "integer"
+        }
+      }
+    }
+  }, {
+    "url" : "/jobs/:jobid/rescales/details/:rescaleuuid",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      }, {
+        "key" : "rescaleuuid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+      "properties" : {
+        "rescaleUuid" : {
+          "type" : "string"
+        },
+        "resourceRequirementsUuid" : {
+          "type" : "string"
+        },
+        "rescaleAttemptId" : {
+          "type" : "integer"
+        },
+        "vertices" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "object",
+            "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+            "properties" : {
+              "jobVertexId" : {
+                "type" : "any"
+              },
+              "jobVertexName" : {
+                "type" : "string"
+              },
+              "slotSharingGroupId" : {
+                "type" : "any"
+              },
+              "slotSharingGroupName" : {
+                "type" : "string"
+              },
+              "desiredParallelism" : {
+                "type" : "integer"
+              },
+              "sufficientParallelism" : {
+                "type" : "integer"
+              },
+              "preRescaleParallelism" : {
+                "type" : "integer"
+              },
+              "postRescaleParallelism" : {
+                "type" : "integer"
+              }
+            }
+          }
+        },
+        "slots" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "object",
+            "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+            "properties" : {
+              "slotSharingGroupId" : {
+                "type" : "any"
+              },
+              "slotSharingGroupName" : {
+                "type" : "string"
+              },
+              "requestResourceProfile" : {
+                "type" : "object",
+                "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+                "properties" : {
+                  "cpuCores" : {
+                    "type" : "number"
+                  },
+                  "taskHeapMemory" : {
+                    "type" : "integer"
+                  },
+                  "taskOffHeapMemory" : {
+                    "type" : "integer"
+                  },
+                  "managedMemory" : {
+                    "type" : "integer"
+                  },
+                  "networkMemory" : {
+                    "type" : "integer"
+                  },
+                  "extendedResources" : {
+                    "type" : "object",
+                    "additionalProperties" : {
+                      "type" : "number"
+                    }
+                  }
+                }
+              },
+              "desiredSlots" : {
+                "type" : "integer"
+              },
+              "minimalRequiredSlots" : {
+                "type" : "integer"
+              },
+              "preRescaleSlots" : {
+                "type" : "integer"
+              },
+              "postRescaleSlots" : {
+                "type" : "integer"
+              },
+              "acquiredResourceProfile" : {
+                "type" : "object",
+                "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+              }
+            }
+          }
+        },
+        "schedulerStates" : {
+          "type" : "array",
+          "items" : {
+            "type" : "object",
+            "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+            "properties" : {
+              "state" : {
+                "type" : "string"
+              },
+              "enterTimestampInMillis" : {
+                "type" : "integer"
+              },
+              "leaveTimestampInMillis" : {
+                "type" : "integer"
+              },
+              "durationInMillis" : {
+                "type" : "integer"
+              },
+              "stringifiedException" : {
+                "type" : "string"
+              }
+            }
+          }
+        },
+        "startTimestampInMillis" : {
+          "type" : "integer"
+        },
+        "endTimestampInMillis" : {
+          "type" : "integer"
+        },
+        "terminalState" : {
+          "type" : "string",
+          "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+        },
+        "triggerCause" : {
+          "type" : "string",
+          "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", 
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+        },
+        "terminatedReason" : {
+          "type" : "string",
+          "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", 
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", 
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+        }
+      }
+    }
+  }, {
+    "url" : "/jobs/:jobid/rescales/history",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+        "properties" : {
+          "rescaleUuid" : {
+            "type" : "string"
+          },
+          "resourceRequirementsUuid" : {
+            "type" : "string"
+          },
+          "rescaleAttemptId" : {
+            "type" : "integer"
+          },
+          "vertices" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "object",
+              "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+              "properties" : {
+                "jobVertexId" : {
+                  "type" : "any"
+                },
+                "jobVertexName" : {
+                  "type" : "string"
+                },
+                "slotSharingGroupId" : {
+                  "type" : "any"
+                },
+                "slotSharingGroupName" : {
+                  "type" : "string"
+                },
+                "desiredParallelism" : {
+                  "type" : "integer"
+                },
+                "sufficientParallelism" : {
+                  "type" : "integer"
+                },
+                "preRescaleParallelism" : {
+                  "type" : "integer"
+                },
+                "postRescaleParallelism" : {
+                  "type" : "integer"
+                }
+              }
+            }
+          },
+          "slots" : {
+            "type" : "object",
+            "additionalProperties" : {
+              "type" : "object",
+              "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+              "properties" : {
+                "slotSharingGroupId" : {
+                  "type" : "any"
+                },
+                "slotSharingGroupName" : {
+                  "type" : "string"
+                },
+                "requestResourceProfile" : {
+                  "type" : "object",
+                  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+                  "properties" : {
+                    "cpuCores" : {
+                      "type" : "number"
+                    },
+                    "taskHeapMemory" : {
+                      "type" : "integer"
+                    },
+                    "taskOffHeapMemory" : {
+                      "type" : "integer"
+                    },
+                    "managedMemory" : {
+                      "type" : "integer"
+                    },
+                    "networkMemory" : {
+                      "type" : "integer"
+                    },
+                    "extendedResources" : {
+                      "type" : "object",
+                      "additionalProperties" : {
+                        "type" : "number"
+                      }
+                    }
+                  }
+                },
+                "desiredSlots" : {
+                  "type" : "integer"
+                },
+                "minimalRequiredSlots" : {
+                  "type" : "integer"
+                },
+                "preRescaleSlots" : {
+                  "type" : "integer"
+                },
+                "postRescaleSlots" : {
+                  "type" : "integer"
+                },
+                "acquiredResourceProfile" : {
+                  "type" : "object",
+                  "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+                }
+              }
+            }
+          },
+          "schedulerStates" : {
+            "type" : "array",
+            "items" : {
+              "type" : "object",
+              "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+              "properties" : {
+                "state" : {
+                  "type" : "string"
+                },
+                "enterTimestampInMillis" : {
+                  "type" : "integer"
+                },
+                "leaveTimestampInMillis" : {
+                  "type" : "integer"
+                },
+                "durationInMillis" : {
+                  "type" : "integer"
+                },
+                "stringifiedException" : {
+                  "type" : "string"
+                }
+              }
+            }
+          },
+          "startTimestampInMillis" : {
+            "type" : "integer"
+          },
+          "endTimestampInMillis" : {
+            "type" : "integer"
+          },
+          "terminalState" : {
+            "type" : "string",
+            "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+          },
+          "triggerCause" : {
+            "type" : "string",
+            "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", 
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+          },
+          "terminatedReason" : {
+            "type" : "string",
+            "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", 
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", 
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+          }
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/rescaling",
     "method" : "PATCH",
@@ -4781,239 +5167,5 @@
         }
       }
     }
-  }, {
-    "url" : "/jobs/:jobid/rescales/config",
-    "method" : "GET",
-    "status-code" : "200 OK",
-    "file-upload" : false,
-    "path-parameters" : {
-      "pathParameters" : [ {
-        "key" : "jobid"
-      } ]
-    },
-    "query-parameters" : {
-      "queryParameters" : [ ]
-    },
-    "request" : {
-      "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
-    },
-    "response" : {
-      "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo",
-      "properties" : {
-        "executingCooldownTimeoutInMillis" : {
-          "type" : "integer"
-        },
-        "executingResourceStabilizationTimeoutInMillis" : {
-          "type" : "integer"
-        },
-        "maximumDelayForTriggeringRescaleInMillis" : {
-          "type" : "integer"
-        },
-        "rescaleHistoryMax" : {
-          "type" : "integer"
-        },
-        "rescaleOnFailedCheckpointCount" : {
-          "type" : "integer"
-        },
-        "schedulerExecutionMode" : {
-          "type" : "string",
-          "enum" : [ "REACTIVE" ]
-        },
-        "slotIdleTimeoutInMillis" : {
-          "type" : "integer"
-        },
-        "submissionResourceStabilizationTimeoutInMillis" : {
-          "type" : "integer"
-        },
-        "submissionResourceWaitTimeoutInMillis" : {
-          "type" : "integer"
-        }
-      }
-    }
-  }, {
-    "url": "/jobs/:jobid/rescales/details/:rescaleuuid",
-    "method": "GET",
-    "status-code": "200 OK",
-    "file-upload": false,
-    "path-parameters": {
-      "pathParameters": [
-        {
-          "key": "jobid"
-        },
-        {
-          "key": "rescaleuuid"
-        }
-      ]
-    },
-    "query-parameters": {
-      "queryParameters": []
-    },
-    "request": {
-      "type": "object"
-    },
-    "response": {
-      "type": "object",
-      "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
-      "properties": {
-        "endTimestampInMillis": {
-          "type": "integer"
-        },
-        "rescaleAttemptId": {
-          "type": "integer"
-        },
-        "rescaleUuid": {
-          "type": "string"
-        },
-        "resourceRequirementsUuid": {
-          "type": "string"
-        },
-        "schedulerStates": {
-          "type": "array",
-          "items": {
-            "type": "object",
-            "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
-            "properties": {
-              "durationInMillis": {
-                "type": "integer"
-              },
-              "enterTimestampInMillis": {
-                "type": "integer"
-              },
-              "leaveTimestampInMillis": {
-                "type": "integer"
-              },
-              "state": {
-                "type": "string"
-              },
-              "stringifiedException": {
-                "type": "string"
-              }
-            }
-          }
-        },
-        "slots": {
-          "type": "object",
-          "additionalProperties": {
-            "type": "object",
-            "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
-            "properties": {
-              "acquiredResourceProfile": {
-                "type": "object",
-                "$ref": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
-              },
-              "desiredSlots": {
-                "type": "integer"
-              },
-              "minimalRequiredSlots": {
-                "type": "integer"
-              },
-              "postRescaleSlots": {
-                "type": "integer"
-              },
-              "preRescaleSlots": {
-                "type": "integer"
-              },
-              "requestResourceProfile": {
-                "type": "object",
-                "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
-                "properties": {
-                  "cpuCores": {
-                    "type": "number"
-                  },
-                  "extendedResources": {
-                    "type": "object",
-                    "additionalProperties": {
-                      "type": "number"
-                    }
-                  },
-                  "managedMemory": {
-                    "type": "integer"
-                  },
-                  "networkMemory": {
-                    "type": "integer"
-                  },
-                  "taskHeapMemory": {
-                    "type": "integer"
-                  },
-                  "taskOffHeapMemory": {
-                    "type": "integer"
-                  }
-                }
-              },
-              "slotSharingGroupId": {
-                "type": "any"
-              },
-              "slotSharingGroupName": {
-                "type": "string"
-              }
-            }
-          }
-        },
-        "startTimestampInMillis": {
-          "type": "integer"
-        },
-        "terminalState": {
-          "type": "string",
-          "enum": ["COMPLETED", "FAILED", "IGNORED"]
-        },
-        "terminatedReason": {
-          "type": "string",
-          "enum": [
-            "SUCCEEDED",
-            "EXCEPTION_OCCURRED",
-            "RESOURCE_REQUIREMENTS_UPDATED",
-            "NO_RESOURCES_OR_PARALLELISMS_CHANGE",
-            "JOB_FINISHED",
-            "JOB_FAILED",
-            "JOB_CANCELED",
-            "JOB_FAILOVER_RESTARTING"
-          ]
-        },
-        "triggerCause": {
-          "type": "string",
-          "enum": [
-            "INITIAL_SCHEDULE",
-            "UPDATE_REQUIREMENT",
-            "NEW_RESOURCE_AVAILABLE",
-            "RECOVERABLE_FAILOVER"
-          ]
-        },
-        "vertices": {
-          "type": "object",
-          "additionalProperties": {
-            "type": "object",
-            "id": 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
-            "properties": {
-              "desiredParallelism": {
-                "type": "integer"
-              },
-              "jobVertexId": {
-                "type": "any"
-              },
-              "jobVertexName": {
-                "type": "string"
-              },
-              "postRescaleParallelism": {
-                "type": "integer"
-              },
-              "preRescaleParallelism": {
-                "type": "integer"
-              },
-              "slotSharingGroupId": {
-                "type": "any"
-              },
-              "slotSharingGroupName": {
-                "type": "string"
-              },
-              "sufficientParallelism": {
-                "type": "integer"
-              }
-            }
-          }
-        }
-      }
-    }
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
new file mode 100644
index 00000000000..6e7541f87e3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistory;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** Handler to response job rescales history. */
+public class JobRescalesHistoryHandler
+        extends AbstractExecutionGraphHandler<JobRescalesHistory, 
JobMessageParameters>
+        implements JsonArchivist {
+
+    public JobRescalesHistoryHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Duration timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, JobRescalesHistory, 
JobMessageParameters>
+                    messageHeaders,
+            ExecutionGraphCache executionGraphCache,
+            Executor executor) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                messageHeaders,
+                executionGraphCache,
+                executor);
+    }
+
+    @Override
+    protected JobRescalesHistory handleRequest(
+            HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo 
executionGraphInfo)
+            throws RestHandlerException {
+        return getJobRescalesHistory(executionGraphInfo);
+    }
+
+    private JobRescalesHistory getJobRescalesHistory(ExecutionGraphInfo 
executionGraphInfo)
+            throws RestHandlerException {
+        if (executionGraphInfo.getRescalesStatsSnapshot() == null
+                || 
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory() == null) {
+            throw new RestHandlerException(
+                    String.format(
+                            "The job `%s` has not enabled the `%s` scheduler, 
or it has been enabled but the value of configuration option `%s` has not been 
set to greater than `0`.",
+                            executionGraphInfo.getJobId(),
+                            JobManagerOptions.SchedulerType.Adaptive,
+                            
WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()),
+                    HttpResponseStatus.NOT_FOUND,
+                    RestHandlerException.LoggingBehavior.IGNORE);
+        }
+        return JobRescalesHistory.fromRescalesStatsSnapshot(
+                executionGraphInfo.getRescalesStatsSnapshot());
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
+            throws IOException {
+
+        ResponseBody response;
+        try {
+            response = getJobRescalesHistory(executionGraphInfo);
+        } catch (RestHandlerException rhe) {
+            response = new ErrorResponseBody(rhe.getMessage());
+        }
+        return Collections.singletonList(
+                new ArchivedJson(
+                        JobRescalesHistoryHeaders.getInstance()
+                                .getTargetRestEndpointURL()
+                                .replace(
+                                        ':' + JobIDPathParameter.KEY,
+                                        
executionGraphInfo.getJobId().toString()),
+                        response));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
index 82cdd35fac3..674bc275e77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
@@ -51,6 +51,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -182,16 +183,18 @@ public class JobRescaleDetails implements ResponseBody, 
Serializable {
                 terminatedReason);
     }
 
-    public static JobRescaleDetails fromRescale(Rescale rescale, boolean 
includeSchedulerStates) {
+    public static JobRescaleDetails fromRescale(Rescale rescale, boolean 
includeDetailedInfo) {
         return new JobRescaleDetails(
                 rescale.getRescaleIdInfo().getRescaleUuid().toString(),
                 
rescale.getRescaleIdInfo().getResourceRequirementsId().toString(),
                 rescale.getRescaleIdInfo().getRescaleAttemptId(),
-                rescale.getVertices(),
-                convertMapValues(
-                        rescale.getSlots(),
-                        
SlotSharingGroupRescaleInfo::fromSlotSharingGroupRescale),
-                includeSchedulerStates ? rescale.getSchedulerStates() : null,
+                includeDetailedInfo ? rescale.getVertices() : 
Collections.emptyMap(),
+                includeDetailedInfo
+                        ? convertMapValues(
+                                rescale.getSlots(),
+                                
SlotSharingGroupRescaleInfo::fromSlotSharingGroupRescale)
+                        : Collections.emptyMap(),
+                includeDetailedInfo ? rescale.getSchedulerStates() : 
Collections.emptyList(),
                 rescale.getStartTimestamp(),
                 rescale.getEndTimestamp(),
                 rescale.getTerminalState(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistory.java
new file mode 100644
index 00000000000..9cfc69d9388
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Response body for {@link JobRescalesHistoryHandler}. */
+@Schema(name = "JobRescalesHistory")
+public class JobRescalesHistory extends ArrayList<JobRescaleDetails>
+        implements ResponseBody, Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    // a default constructor is required for collection type marshalling
+    public JobRescalesHistory() {}
+
+    public JobRescalesHistory(int initialElements) {
+        super(initialElements);
+    }
+
+    public static JobRescalesHistory from(List<JobRescaleDetails> 
rescalesDetails) {
+        JobRescalesHistory history = new 
JobRescalesHistory(rescalesDetails.size());
+        history.addAll(rescalesDetails);
+        return history;
+    }
+
+    public static JobRescalesHistory 
fromRescalesStatsSnapshot(RescalesStatsSnapshot snapshot) {
+        return from(
+                snapshot.getRescaleHistory().stream()
+                        .map((Rescale rescale) -> 
JobRescaleDetails.fromRescale(rescale, false))
+                        .collect(Collectors.toList()));
+    }
+
+    @Override
+    @JsonIgnore
+    public boolean isEmpty() {
+        return super.isEmpty();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistoryHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistoryHeaders.java
new file mode 100644
index 00000000000..810864c8ab1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistoryHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for {@link JobRescalesHistoryHandler}. */
+public class JobRescalesHistoryHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, JobRescalesHistory, JobMessageParameters> {
+    public static final JobRescalesHistoryHeaders INSTANCE = new 
JobRescalesHistoryHeaders();
+    public static final String JOB_RESCALES_DETAILS_HISTORY_PATH = 
"/jobs/:jobid/rescales/history";
+
+    @Override
+    public Class<JobRescalesHistory> getResponseClass() {
+        return JobRescalesHistory.class;
+    }
+
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    public String getDescription() {
+        return "Return job rescales history.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOB_RESCALES_DETAILS_HISTORY_PATH;
+    }
+
+    public static JobRescalesHistoryHeaders getInstance() {
+        return INSTANCE;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
index 59d91ca5f4d..42e98a499c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -127,7 +127,10 @@ public class DefaultRescaleTimeline implements 
RescaleTimeline {
     public RescalesStatsSnapshot createSnapshot() {
         List<Rescale> rescales = rescaleHistory.toArrayList();
         Collections.reverse(rescales);
-        return new RescalesStatsSnapshot(List.copyOf(rescales), 
rescalesSummary.createSnapshot());
+        return new RescalesStatsSnapshot(
+                List.copyOf(rescales),
+                Map.copyOf(latestRescales),
+                rescalesSummary.createSnapshot());
     }
 
     private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
index f8697fe473f..0dfd3fed380 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -35,17 +36,21 @@ public class RescalesStatsSnapshot implements Serializable {
 
     private final List<Rescale> rescaleHistory;
     private final Map<AbstractID, Rescale> idToRescaleMap;
+    private final Map<TerminalState, Rescale> latestRescales;
     private final RescalesSummarySnapshot rescalesSummarySnapshot;
 
     public RescalesStatsSnapshot(
-            List<Rescale> rescaleHistory, RescalesSummarySnapshot 
rescalesSummarySnapshot) {
-        this.rescaleHistory = rescaleHistory;
+            List<Rescale> rescaleHistory,
+            Map<TerminalState, Rescale> latestRescales,
+            RescalesSummarySnapshot rescalesSummarySnapshot) {
+        this.rescaleHistory = List.copyOf(rescaleHistory);
         this.idToRescaleMap =
                 rescaleHistory.stream()
                         .collect(
                                 Collectors.toMap(
                                         r -> 
r.getRescaleIdInfo().getRescaleUuid(),
                                         Function.identity()));
+        this.latestRescales = Map.copyOf(latestRescales);
         this.rescalesSummarySnapshot = rescalesSummarySnapshot;
     }
 
@@ -57,6 +62,11 @@ public class RescalesStatsSnapshot implements Serializable {
         return idToRescaleMap.get(rescaleId);
     }
 
+    @Nullable
+    public Rescale getLatestRescale(TerminalState terminalState) {
+        return latestRescales.get(terminalState);
+    }
+
     public RescalesSummarySnapshot getRescalesSummarySnapshot() {
         return rescalesSummarySnapshot;
     }
@@ -64,6 +74,7 @@ public class RescalesStatsSnapshot implements Serializable {
     public static RescalesStatsSnapshot emptySnapshot() {
         return new RescalesStatsSnapshot(
                 new ArrayList<>(),
+                new HashMap<>(),
                 new RescalesSummarySnapshot(
                         StatsSummarySnapshot.empty(),
                         StatsSummarySnapshot.empty(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 8bf45671feb..5324e3d5446 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -98,6 +98,7 @@ import 
org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
 import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler;
 import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
 import 
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
 import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -166,6 +167,7 @@ import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails
 import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1225,6 +1227,18 @@ public class WebMonitorEndpoint<T extends 
RestfulGateway> extends RestServerEndp
         handlers.add(
                 Tuple2.of(jobRescaleDetailsHandler.getMessageHeaders(), 
jobRescaleDetailsHandler));
 
+        final JobRescalesHistoryHandler jobRescalesHistoryHandler =
+                new JobRescalesHistoryHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobRescalesHistoryHeaders.getInstance(),
+                        executionGraphCache,
+                        executor);
+        handlers.add(
+                Tuple2.of(
+                        jobRescalesHistoryHandler.getMessageHeaders(), 
jobRescalesHistoryHandler));
+
         handlers.stream()
                 .map(tuple -> tuple.f1)
                 .filter(handler -> handler instanceof JsonArchivist)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
index 3ecd4e46038..be0275a42b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupResc
 import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
 import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.jupiter.api.Test;
@@ -150,7 +149,9 @@ class JobRescaleDetailsHandlerTest {
                         null,
                         null,
                         new RescalesStatsSnapshot(
-                                List.of(rescale), 
rescalesSummary.createSnapshot()));
+                                List.of(rescale),
+                                Map.of(rescale.getTerminalState(), rescale),
+                                rescalesSummary.createSnapshot()));
         final HandlerRequest<EmptyRequestBody> request =
                 createRequest(
                         executionGraphInfo.getJobId(), 
rescale.getRescaleIdInfo().getRescaleUuid());
@@ -160,7 +161,7 @@ class JobRescaleDetailsHandlerTest {
     }
 
     private static HandlerRequest<EmptyRequestBody> createRequest(
-            JobID jobId, AbstractID rescaleUuid) throws 
HandlerRequestException {
+            JobID jobId, RescaleIdInfo.RescaleUUID rescaleUuid) throws 
HandlerRequestException {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
         pathParameters.put(JobRescaleIDPathParameter.KEY, 
rescaleUuid.toString());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandlerTest.java
similarity index 73%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
copy to 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandlerTest.java
index 3ecd4e46038..5cff2477c79 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandlerTest.java
@@ -30,28 +30,27 @@ import 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
 import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
-import 
org.apache.flink.runtime.rest.messages.job.rescales.JobIDRescaleIDParameters;
-import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
-import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
-import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistory;
+import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders;
 import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
 import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
 import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
 import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
 import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot;
 import 
org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupRescale;
 import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
 import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -59,52 +58,34 @@ import static 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDeta
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Test for {@link JobRescaleDetailsHandler}. */
-class JobRescaleDetailsHandlerTest {
-
-    private final JobRescaleDetailsHandler testInstance =
-            new JobRescaleDetailsHandler(
+/** Test for {@link JobRescalesHistoryHandler}. */
+class JobRescalesHistoryHandlerTest {
+    private final JobRescalesHistoryHandler testInstance =
+            new JobRescalesHistoryHandler(
                     CompletableFuture::new,
                     TestingUtils.TIMEOUT,
-                    Map.of(),
-                    JobRescaleDetailsHeaders.getInstance(),
+                    Collections.emptyMap(),
+                    JobRescalesHistoryHeaders.getInstance(),
                     new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, 
TestingUtils.TIMEOUT),
                     Executors.directExecutor());
 
     @Test
-    void testUnNormalCases() throws HandlerRequestException, 
RestHandlerException {
+    void testSchedulerNotEnabledRescalesHistory() throws 
HandlerRequestException {
         // Test for adaptive scheduler rescales was not enabled for job.
         final ExecutionGraphInfo 
executionGraphInfoWithNullRescalesStatsSnapshot =
                 new ExecutionGraphInfo(
-                        new ArchivedExecutionGraphBuilder().build(), 
List.of(), null);
+                        new ArchivedExecutionGraphBuilder().build(), 
Collections.emptyList(), null);
         final HandlerRequest<EmptyRequestBody> request =
-                createRequest(
-                        
executionGraphInfoWithNullRescalesStatsSnapshot.getJobId(),
-                        new RescaleIdInfo.RescaleUUID());
+                
createRequest(executionGraphInfoWithNullRescalesStatsSnapshot.getJobId());
         assertThatThrownBy(
                         () ->
                                 testInstance.handleRequest(
                                         request, 
executionGraphInfoWithNullRescalesStatsSnapshot))
                 .isInstanceOf(RestHandlerException.class);
-
-        // Test for that case could not find rescale details for the specified 
rescale uuid.
-        final ExecutionGraphInfo 
executionGraphInfoWithEmptyRescalesStatsSnapshot =
-                new ExecutionGraphInfo(
-                        new ArchivedExecutionGraphBuilder().build(),
-                        List.of(),
-                        JobManagerOptions.SchedulerType.Adaptive,
-                        null,
-                        RescalesStatsSnapshot.emptySnapshot());
-        assertThatThrownBy(
-                        () ->
-                                testInstance.handleRequest(
-                                        request, 
executionGraphInfoWithEmptyRescalesStatsSnapshot))
-                .isInstanceOf(RestHandlerException.class);
     }
 
     @Test
-    void testRequestNormalJobRescaleStatisticsDetails()
-            throws HandlerRequestException, RestHandlerException {
+    void testRequestNormalJobRescaleHistory() throws HandlerRequestException, 
RestHandlerException {
         Rescale rescale =
                 new Rescale(new RescaleIdInfo(new 
RescaleIdInfo.ResourceRequirementsID(), 1L))
                         .setStartTimestamp(1L)
@@ -142,34 +123,40 @@ class JobRescaleDetailsHandlerTest {
 
         RescalesSummary rescalesSummary = new RescalesSummary(2);
         rescalesSummary.addTerminated(rescale);
+        RescalesSummarySnapshot rescalesSummarySnapshot = 
rescalesSummary.createSnapshot();
+        RescalesStatsSnapshot rescalesStatsSnapshot =
+                new RescalesStatsSnapshot(
+                        Collections.singletonList(rescale),
+                        Collections.singletonMap(rescale.getTerminalState(), 
rescale),
+                        rescalesSummarySnapshot);
 
         final ExecutionGraphInfo executionGraphInfo =
                 new ExecutionGraphInfo(
                         new ArchivedExecutionGraphBuilder().build(),
-                        List.of(),
-                        null,
+                        Collections.emptyList(),
+                        JobManagerOptions.SchedulerType.Adaptive,
                         null,
-                        new RescalesStatsSnapshot(
-                                List.of(rescale), 
rescalesSummary.createSnapshot()));
+                        rescalesStatsSnapshot);
         final HandlerRequest<EmptyRequestBody> request =
-                createRequest(
-                        executionGraphInfo.getJobId(), 
rescale.getRescaleIdInfo().getRescaleUuid());
-        JobRescaleDetails jobRescaleDetails =
+                createRequest(executionGraphInfo.getJobId());
+        JobRescalesHistory actualJobRescalesHistory =
                 testInstance.handleRequest(request, executionGraphInfo);
-        assertThat(jobRescaleDetails).isEqualTo(fromRescale(rescale, true));
+
+        JobRescalesHistory expectedJobRescalesHistory =
+                
JobRescalesHistory.from(Collections.singletonList(fromRescale(rescale, false)));
+        
assertThat(actualJobRescalesHistory).isEqualTo(expectedJobRescalesHistory);
     }
 
-    private static HandlerRequest<EmptyRequestBody> createRequest(
-            JobID jobId, AbstractID rescaleUuid) throws 
HandlerRequestException {
+    private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
+            throws HandlerRequestException {
         final Map<String, String> pathParameters = new HashMap<>();
         pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
-        pathParameters.put(JobRescaleIDPathParameter.KEY, 
rescaleUuid.toString());
 
         return HandlerRequest.resolveParametersAndCreate(
                 EmptyRequestBody.getInstance(),
-                new JobIDRescaleIDParameters(),
+                new JobMessageParameters(),
                 pathParameters,
                 new HashMap<>(),
-                List.of());
+                Collections.emptyList());
     }
 }

Reply via email to