This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b1c858eae73 [FLINK-38895][runtime] Introduce the
/jobs/:jobid/rescales/details/:rescaleuuid endpoint in the REST API
b1c858eae73 is described below
commit b1c858eae739dffb84a76e6c4c7a6c938bfc6d8c
Author: Yuepeng Pan <[email protected]>
AuthorDate: Tue Mar 31 08:17:23 2026 +0800
[FLINK-38895][runtime] Introduce the
/jobs/:jobid/rescales/details/:rescaleuuid endpoint in the REST API
Co-authored-by: XComp <[email protected]>
Co-authored-by: WeiZhong94 <[email protected]>
Co-authored-by: mateczagany <[email protected]>
Co-authored-by: ferenc-csaky <[email protected]>
---
.../shortcodes/generated/rest_v1_dispatcher.html | 190 ++++++++
docs/static/generated/rest_v1_dispatcher.yml | 170 +++++++
.../src/test/resources/rest_api_v1.snapshot | 183 ++++++++
.../job/rescales/JobRescaleDetailsHandler.java | 127 ++++++
.../job/rescales/JobIDRescaleIDParameters.java | 41 ++
.../messages/job/rescales/JobRescaleDetails.java | 508 +++++++++++++++++++++
.../job/rescales/JobRescaleDetailsHeaders.java | 74 +++
.../job/rescales/JobRescaleIDPathParameter.java | 52 +++
.../messages/job/rescales}/SchedulerStateSpan.java | 39 +-
.../json/SlotSharingGroupIDKeyDeserializer.java | 35 ++
.../json/SlotSharingGroupIDKeySerializer.java | 43 ++
.../adaptive/timeline/DefaultRescaleTimeline.java | 5 +-
.../scheduler/adaptive/timeline/Rescale.java | 26 +-
.../scheduler/adaptive/timeline/RescaleIdInfo.java | 33 +-
.../adaptive/timeline/RescalesStatsSnapshot.java | 17 +
.../timeline/VertexParallelismRescale.java | 160 -------
.../runtime/webmonitor/WebMonitorEndpoint.java | 13 +
.../job/rescales/JobRescaleDetailsHandlerTest.java | 175 +++++++
.../scheduler/adaptive/timeline/RescaleTest.java | 59 ++-
.../adaptive/timeline/RescaleTimelineITCase.java | 12 +-
.../adaptive/timeline/RescalesSummaryTest.java | 4 +-
21 files changed, 1743 insertions(+), 223 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 07ebf893e07..10fba048344 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3643,6 +3643,196 @@ Using 'curl' you can upload a jar via 'curl -X POST -H
"Expect:" -F "jarfile=@pa
</tr>
</tbody>
</table>
+<table class="rest-api table table-bordered">
+ <tbody>
+ <tr>
+ <td class="text-left"
colspan="2"><h5><strong>/jobs/:jobid/rescales/details/:rescaleuuid</strong></h5></td>
+ </tr>
+ <tr>
+ <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+ <td class="text-left">Response code: <code>200 OK</code></td>
+ </tr>
+ <tr>
+ <td colspan="2">Return a job rescale details.</td>
+ </tr>
+ <tr>
+ <td colspan="2">Path parameters</td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies
a job.</li>
+<li><code>rescaleuuid</code> - String value that identifies a job rescale.</li>
+ </ul>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Request</summary>
+ <pre><code>{}</code></pre>
+ </label>
+ </td>
+ </tr>
+ <tr>
+ <td colspan="2">
+ <label>
+ <details>
+ <summary>Response</summary>
+ <pre><code>{
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+ "properties" : {
+ "endTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "rescaleAttemptId" : {
+ "type" : "integer"
+ },
+ "rescaleUuid" : {
+ "type" : "string"
+ },
+ "resourceRequirementsUuid" : {
+ "type" : "string"
+ },
+ "schedulerStates" : {
+ "type" : "array",
+ "items" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+ "properties" : {
+ "durationInMillis" : {
+ "type" : "integer"
+ },
+ "enterTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "leaveTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "state" : {
+ "type" : "string"
+ },
+ "stringifiedException" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "slots" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+ "properties" : {
+ "acquiredResourceProfile" : {
+ "type" : "object",
+ "$ref" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+ },
+ "desiredSlots" : {
+ "type" : "integer"
+ },
+ "minimalRequiredSlots" : {
+ "type" : "integer"
+ },
+ "postRescaleSlots" : {
+ "type" : "integer"
+ },
+ "preRescaleSlots" : {
+ "type" : "integer"
+ },
+ "requestResourceProfile" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+ "properties" : {
+ "cpuCores" : {
+ "type" : "number"
+ },
+ "extendedResources" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "number"
+ }
+ },
+ "managedMemory" : {
+ "type" : "integer"
+ },
+ "networkMemory" : {
+ "type" : "integer"
+ },
+ "taskHeapMemory" : {
+ "type" : "integer"
+ },
+ "taskOffHeapMemory" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "slotSharingGroupId" : {
+ "type" : "any"
+ },
+ "slotSharingGroupName" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "startTimestampInMillis" : {
+ "type" : "integer"
+ },
+ "terminalState" : {
+ "type" : "string",
+ "enum" : [ "COMPLETED", "FAILED", "IGNORED" ]
+ },
+ "terminatedReason" : {
+ "type" : "string",
+ "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED",
"RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE",
"JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ]
+ },
+ "triggerCause" : {
+ "type" : "string",
+ "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT",
"NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ]
+ },
+ "vertices" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+ "properties" : {
+ "desiredParallelism" : {
+ "type" : "integer"
+ },
+ "jobVertexId" : {
+ "type" : "any"
+ },
+ "jobVertexName" : {
+ "type" : "string"
+ },
+ "postRescaleParallelism" : {
+ "type" : "integer"
+ },
+ "preRescaleParallelism" : {
+ "type" : "integer"
+ },
+ "slotSharingGroupId" : {
+ "type" : "any"
+ },
+ "slotSharingGroupName" : {
+ "type" : "string"
+ },
+ "sufficientParallelism" : {
+ "type" : "integer"
+ }
+ }
+ }
+ }
+ }
+}</code></pre>
+ </label>
+ </td>
+ </tr>
+ </tbody>
+</table>
<table class="rest-api table table-bordered">
<tbody>
<tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml
b/docs/static/generated/rest_v1_dispatcher.yml
index 16df36dc728..43fa4b3fe28 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -934,6 +934,30 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/JobRescaleConfigInfo"
+ /jobs/{jobid}/rescales/details/{rescaleuuid}:
+ get:
+ description: Return a job rescale details.
+ operationId: getJobRescaleDetails
+ parameters:
+ - name: jobid
+ in: path
+ description: 32-character hexadecimal string value that identifies a
job.
+ required: true
+ schema:
+ $ref: "#/components/schemas/JobID"
+ - name: rescaleuuid
+ in: path
+ description: String value that identifies a job rescale.
+ required: true
+ schema:
+ $ref: "#/components/schemas/RescaleUUID"
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: "#/components/schemas/JobRescaleDetails"
/jobs/{jobid}/rescaling:
patch:
description: Triggers the rescaling of a job. This async operation would
return
@@ -2768,6 +2792,40 @@ components:
submissionResourceWaitTimeoutInMillis:
type: integer
format: int64
+ JobRescaleDetails:
+ type: object
+ properties:
+ endTimestampInMillis:
+ type: integer
+ format: int64
+ rescaleAttemptId:
+ type: integer
+ format: int64
+ rescaleUuid:
+ type: string
+ resourceRequirementsUuid:
+ type: string
+ schedulerStates:
+ type: array
+ items:
+ $ref: "#/components/schemas/SchedulerStateSpan"
+ slots:
+ type: object
+ additionalProperties:
+ $ref: "#/components/schemas/SlotSharingGroupRescaleInfo"
+ startTimestampInMillis:
+ type: integer
+ format: int64
+ terminalState:
+ $ref: "#/components/schemas/TerminalState"
+ terminatedReason:
+ $ref: "#/components/schemas/TerminatedReason"
+ triggerCause:
+ $ref: "#/components/schemas/TriggerCause"
+ vertices:
+ type: object
+ additionalProperties:
+ $ref: "#/components/schemas/VertexParallelismRescaleInfo"
JobResourceRequirementsBody:
type: object
additionalProperties:
@@ -3064,6 +3122,18 @@ components:
- CLAIM
- NO_CLAIM
- LEGACY
+ RescaleUUID:
+ type: object
+ properties:
+ bytes:
+ type: string
+ format: byte
+ lowerPart:
+ type: integer
+ format: int64
+ upperPart:
+ type: integer
+ format: int64
ResourceID:
pattern: "[0-9a-f]{32}"
type: string
@@ -3166,6 +3236,22 @@ components:
type: string
enum:
- REACTIVE
+ SchedulerStateSpan:
+ type: object
+ properties:
+ durationInMillis:
+ type: integer
+ format: int64
+ enterTimestampInMillis:
+ type: integer
+ format: int64
+ leaveTimestampInMillis:
+ type: integer
+ format: int64
+ state:
+ type: string
+ stringifiedException:
+ type: string
SchedulerType:
type: string
enum:
@@ -3206,6 +3292,29 @@ components:
upperPart:
type: integer
format: int64
+ SlotSharingGroupRescaleInfo:
+ type: object
+ properties:
+ acquiredResourceProfile:
+ $ref: "#/components/schemas/ResourceProfileInfo"
+ desiredSlots:
+ type: integer
+ format: int32
+ minimalRequiredSlots:
+ type: integer
+ format: int32
+ postRescaleSlots:
+ type: integer
+ format: int32
+ preRescaleSlots:
+ type: integer
+ format: int32
+ requestResourceProfile:
+ $ref: "#/components/schemas/ResourceProfileInfo"
+ slotSharingGroupId:
+ $ref: "#/components/schemas/SlotSharingGroupId"
+ slotSharingGroupName:
+ type: string
StatsSummaryDto:
type: object
properties:
@@ -3660,6 +3769,23 @@ components:
type: array
items:
$ref: "#/components/schemas/TaskManagerInfo"
+ TerminalState:
+ type: string
+ enum:
+ - COMPLETED
+ - FAILED
+ - IGNORED
+ TerminatedReason:
+ type: string
+ enum:
+ - SUCCEEDED
+ - EXCEPTION_OCCURRED
+ - RESOURCE_REQUIREMENTS_UPDATED
+ - NO_RESOURCES_OR_PARALLELISMS_CHANGE
+ - JOB_FINISHED
+ - JOB_FAILED
+ - JOB_CANCELED
+ - JOB_FAILOVER_RESTARTING
TerminationMode:
type: string
enum:
@@ -3685,6 +3811,13 @@ components:
- FULL
- ON_CPU
- OFF_CPU
+ TriggerCause:
+ type: string
+ enum:
+ - INITIAL_SCHEDULE
+ - UPDATE_REQUIREMENT
+ - NEW_RESOURCE_AVAILABLE
+ - RECOVERABLE_FAILOVER
TriggerId:
pattern: "[0-9a-f]{32}"
type: string
@@ -3734,3 +3867,40 @@ components:
endTimestamp:
type: integer
format: int64
+ VertexParallelismInformation:
+ type: object
+ properties:
+ maxParallelism:
+ type: integer
+ format: int32
+ minParallelism:
+ type: integer
+ format: int32
+ parallelism:
+ type: integer
+ format: int32
+ VertexParallelismRescaleInfo:
+ type: object
+ properties:
+ desiredParallelism:
+ type: integer
+ format: int32
+ jobVertexId:
+ $ref: "#/components/schemas/JobVertexID"
+ jobVertexName:
+ type: string
+ postRescaleParallelism:
+ type: integer
+ format: int32
+ preRescaleParallelism:
+ type: integer
+ format: int32
+ requiredParallelisms:
+ $ref: "#/components/schemas/VertexParallelismInformation"
+ slotSharingGroupId:
+ $ref: "#/components/schemas/SlotSharingGroupId"
+ slotSharingGroupName:
+ type: string
+ sufficientParallelism:
+ type: integer
+ format: int32
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index e065545a069..b4a16c9cd74 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -4832,5 +4832,188 @@
}
}
}
+ }, {
+ "url": "/jobs/:jobid/rescales/details/:rescaleuuid",
+ "method": "GET",
+ "status-code": "200 OK",
+ "file-upload": false,
+ "path-parameters": {
+ "pathParameters": [
+ {
+ "key": "jobid"
+ },
+ {
+ "key": "rescaleuuid"
+ }
+ ]
+ },
+ "query-parameters": {
+ "queryParameters": []
+ },
+ "request": {
+ "type": "object"
+ },
+ "response": {
+ "type": "object",
+ "id":
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails",
+ "properties": {
+ "endTimestampInMillis": {
+ "type": "integer"
+ },
+ "rescaleAttemptId": {
+ "type": "integer"
+ },
+ "rescaleUuid": {
+ "type": "string"
+ },
+ "resourceRequirementsUuid": {
+ "type": "string"
+ },
+ "schedulerStates": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "id":
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan",
+ "properties": {
+ "durationInMillis": {
+ "type": "integer"
+ },
+ "enterTimestampInMillis": {
+ "type": "integer"
+ },
+ "leaveTimestampInMillis": {
+ "type": "integer"
+ },
+ "state": {
+ "type": "string"
+ },
+ "stringifiedException": {
+ "type": "string"
+ }
+ }
+ }
+ },
+ "slots": {
+ "type": "object",
+ "additionalProperties": {
+ "type": "object",
+ "id":
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo",
+ "properties": {
+ "acquiredResourceProfile": {
+ "type": "object",
+ "$ref":
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
+ },
+ "desiredSlots": {
+ "type": "integer"
+ },
+ "minimalRequiredSlots": {
+ "type": "integer"
+ },
+ "postRescaleSlots": {
+ "type": "integer"
+ },
+ "preRescaleSlots": {
+ "type": "integer"
+ },
+ "requestResourceProfile": {
+ "type": "object",
+ "id":
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
+ "properties": {
+ "cpuCores": {
+ "type": "number"
+ },
+ "extendedResources": {
+ "type": "object",
+ "additionalProperties": {
+ "type": "number"
+ }
+ },
+ "managedMemory": {
+ "type": "integer"
+ },
+ "networkMemory": {
+ "type": "integer"
+ },
+ "taskHeapMemory": {
+ "type": "integer"
+ },
+ "taskOffHeapMemory": {
+ "type": "integer"
+ }
+ }
+ },
+ "slotSharingGroupId": {
+ "type": "any"
+ },
+ "slotSharingGroupName": {
+ "type": "string"
+ }
+ }
+ }
+ },
+ "startTimestampInMillis": {
+ "type": "integer"
+ },
+ "terminalState": {
+ "type": "string",
+ "enum": ["COMPLETED", "FAILED", "IGNORED"]
+ },
+ "terminatedReason": {
+ "type": "string",
+ "enum": [
+ "SUCCEEDED",
+ "EXCEPTION_OCCURRED",
+ "RESOURCE_REQUIREMENTS_UPDATED",
+ "NO_RESOURCES_OR_PARALLELISMS_CHANGE",
+ "JOB_FINISHED",
+ "JOB_FAILED",
+ "JOB_CANCELED",
+ "JOB_FAILOVER_RESTARTING"
+ ]
+ },
+ "triggerCause": {
+ "type": "string",
+ "enum": [
+ "INITIAL_SCHEDULE",
+ "UPDATE_REQUIREMENT",
+ "NEW_RESOURCE_AVAILABLE",
+ "RECOVERABLE_FAILOVER"
+ ]
+ },
+ "vertices": {
+ "type": "object",
+ "additionalProperties": {
+ "type": "object",
+ "id":
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo",
+ "properties": {
+ "desiredParallelism": {
+ "type": "integer"
+ },
+ "jobVertexId": {
+ "type": "any"
+ },
+ "jobVertexName": {
+ "type": "string"
+ },
+ "postRescaleParallelism": {
+ "type": "integer"
+ },
+ "preRescaleParallelism": {
+ "type": "integer"
+ },
+ "slotSharingGroupId": {
+ "type": "any"
+ },
+ "slotSharingGroupName": {
+ "type": "string"
+ },
+ "sufficientParallelism": {
+ "type": "integer"
+ }
+ }
+ }
+ }
+ }
+ }
} ]
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandler.java
new file mode 100644
index 00000000000..d3f94a32c24
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandler.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobIDRescaleIDParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleIDPathParameter;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** Handler serving the job rescale. */
+public class JobRescaleDetailsHandler
+ extends AbstractExecutionGraphHandler<JobRescaleDetails,
JobIDRescaleIDParameters>
+ implements JsonArchivist {
+
+ public JobRescaleDetailsHandler(
+ GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+ Duration timeout,
+ Map<String, String> responseHeaders,
+ MessageHeaders<EmptyRequestBody, JobRescaleDetails,
JobIDRescaleIDParameters>
+ messageHeaders,
+ ExecutionGraphCache executionGraphCache,
+ Executor executor) {
+ super(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ messageHeaders,
+ executionGraphCache,
+ executor);
+ }
+
+ @Override
+ protected JobRescaleDetails handleRequest(
+ HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo
executionGraphInfo)
+ throws RestHandlerException {
+
+ RescalesStatsSnapshot rescalesStatsSnapshot =
executionGraphInfo.getRescalesStatsSnapshot();
+ JobID jobId = executionGraphInfo.getJobId();
+
+ if (rescalesStatsSnapshot == null) {
+ throw new RestHandlerException(
+ "AdaptiveScheduler rescales was not enabled for job " +
jobId + '.',
+ HttpResponseStatus.NOT_FOUND);
+ }
+
+ RescaleIdInfo.RescaleUUID rescaleUuid =
+ request.getPathParameter(JobRescaleIDPathParameter.class);
+ Rescale rescale = rescalesStatsSnapshot.getRescale(rescaleUuid);
+
+ if (rescale == null) {
+ throw new RestHandlerException(
+ "Could not find rescale details of specified rescaleUuid "
+ rescaleUuid + '.',
+ HttpResponseStatus.NOT_FOUND);
+ }
+
+ return JobRescaleDetails.fromRescale(rescale, true);
+ }
+
+ @Override
+ public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo
executionGraphInfo)
+ throws IOException {
+ if (executionGraphInfo.getRescalesStatsSnapshot() == null
+ ||
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory() == null) {
+ return List.of();
+ }
+
+ List<ArchivedJson> archives = new ArrayList<>();
+ List<Rescale> rescales =
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+ final String commonPathPrefix =
+ JobRescaleDetailsHeaders.getInstance()
+ .getTargetRestEndpointURL()
+ .replace(
+ ':' + JobIDPathParameter.KEY,
+ executionGraphInfo.getJobId().toString());
+ for (Rescale rescale : rescales) {
+ archives.add(
+ new ArchivedJson(
+ commonPathPrefix.replace(
+ ':' + JobRescaleIDPathParameter.KEY,
+
rescale.getRescaleIdInfo().getRescaleUuid().toString()),
+ JobRescaleDetails.fromRescale(rescale, true)));
+ }
+ return archives;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobIDRescaleIDParameters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobIDRescaleIDParameters.java
new file mode 100644
index 00000000000..fdcf9fe6b18
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobIDRescaleIDParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * {@link MessageParameters} for {@link
+ *
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler}.
+ */
+public class JobIDRescaleIDParameters extends JobMessageParameters {
+
+ private final JobRescaleIDPathParameter jobRescaleIDPathParameter =
+ new JobRescaleIDPathParameter();
+
+ @Override
+ public Collection<MessagePathParameter<?>> getPathParameters() {
+ return Arrays.asList(jobPathParameter, jobRescaleIDPathParameter);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
new file mode 100644
index 00000000000..fa9b7aeb195
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.rest.messages.ResourceProfileInfo;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+import
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDDeserializer;
+import
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDKeyDeserializer;
+import
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDKeySerializer;
+import
org.apache.flink.runtime.rest.messages.json.SlotSharingGroupIDSerializer;
+import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupRescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminalState;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.util.Preconditions;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Job rescales details class. */
+@Schema(name = "JobRescaleDetails")
+public class JobRescaleDetails implements ResponseBody, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_RESCALE_UUID = "rescaleUuid";
+ public static final String FIELD_NAME_RESOURCE_REQUIREMENTS_UUID =
"resourceRequirementsUuid";
+ public static final String FIELD_NAME_RESCALE_ATTEMPT_ID =
"rescaleAttemptId";
+ public static final String FIELD_NAME_VERTICES = "vertices";
+ public static final String FIELD_NAME_SLOTS = "slots";
+ public static final String FIELD_NAME_SCHEDULER_STATES = "schedulerStates";
+ public static final String FIELD_NAME_START_TIMESTAMP =
"startTimestampInMillis";
+ public static final String FIELD_NAME_END_TIMESTAMP =
"endTimestampInMillis";
+ public static final String FIELD_NAME_TERMINAL_STATE = "terminalState";
+ public static final String FIELD_NAME_TRIGGER_CAUSE = "triggerCause";
+ public static final String FIELD_NAME_TERMINATED_REASON =
"terminatedReason";
+
+ @JsonProperty(FIELD_NAME_RESCALE_UUID)
+ private final String rescaleUuid;
+
+ @JsonProperty(FIELD_NAME_RESOURCE_REQUIREMENTS_UUID)
+ private final String resourceRequirementsUuid;
+
+ @JsonProperty(FIELD_NAME_RESCALE_ATTEMPT_ID)
+ private final long rescaleAttemptId;
+
+ @JsonProperty(FIELD_NAME_VERTICES)
+ @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
+ @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
+ private final Map<JobVertexID, VertexParallelismRescaleInfo> vertices;
+
+ @JsonProperty(FIELD_NAME_SLOTS)
+ @JsonSerialize(keyUsing = SlotSharingGroupIDKeySerializer.class)
+ @JsonDeserialize(keyUsing = SlotSharingGroupIDKeyDeserializer.class)
+ private final Map<SlotSharingGroupId, SlotSharingGroupRescaleInfo> slots;
+
+ @JsonProperty(FIELD_NAME_SCHEDULER_STATES)
+ private final List<SchedulerStateSpan> schedulerStates;
+
+ @JsonProperty(FIELD_NAME_START_TIMESTAMP)
+ private final Long startTimestamp;
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_END_TIMESTAMP)
+ private final Long endTimestamp;
+
+ @JsonProperty(FIELD_NAME_TERMINAL_STATE)
+ private final TerminalState terminalState;
+
+ @JsonProperty(FIELD_NAME_TRIGGER_CAUSE)
+ private final TriggerCause triggerCause;
+
+ @JsonProperty(FIELD_NAME_TERMINATED_REASON)
+ private final TerminatedReason terminatedReason;
+
+ @JsonCreator
+ public JobRescaleDetails(
+ @JsonProperty(FIELD_NAME_RESCALE_UUID) String rescaleUuid,
+ @JsonProperty(FIELD_NAME_RESOURCE_REQUIREMENTS_UUID) String
resourceRequirementsUuid,
+ @JsonProperty(FIELD_NAME_RESCALE_ATTEMPT_ID) long rescaleAttemptId,
+ @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
+ @JsonDeserialize(keyUsing =
JobVertexIDKeyDeserializer.class)
+ @JsonProperty(FIELD_NAME_VERTICES)
+ Map<JobVertexID, VertexParallelismRescaleInfo> vertices,
+ @JsonSerialize(keyUsing = SlotSharingGroupIDKeySerializer.class)
+ @JsonDeserialize(keyUsing =
SlotSharingGroupIDKeyDeserializer.class)
+ @JsonProperty(FIELD_NAME_SLOTS)
+ Map<SlotSharingGroupId, SlotSharingGroupRescaleInfo> slots,
+ @JsonProperty(FIELD_NAME_SCHEDULER_STATES)
List<SchedulerStateSpan> schedulerStates,
+ @JsonProperty(FIELD_NAME_START_TIMESTAMP) Long startTimestamp,
+ @Nullable @JsonProperty(FIELD_NAME_END_TIMESTAMP) Long
endTimestamp,
+ @JsonProperty(FIELD_NAME_TERMINAL_STATE) TerminalState
terminalState,
+ @JsonProperty(FIELD_NAME_TRIGGER_CAUSE) TriggerCause triggerCause,
+ @JsonProperty(FIELD_NAME_TERMINATED_REASON) TerminatedReason
terminatedReason) {
+ this.rescaleUuid = rescaleUuid;
+ this.resourceRequirementsUuid = resourceRequirementsUuid;
+ this.rescaleAttemptId = rescaleAttemptId;
+ this.vertices = vertices;
+ this.slots = slots;
+ this.schedulerStates = schedulerStates;
+ this.startTimestamp = startTimestamp;
+ this.endTimestamp = endTimestamp;
+ this.terminalState = terminalState;
+ this.triggerCause = triggerCause;
+ this.terminatedReason = terminatedReason;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JobRescaleDetails that = (JobRescaleDetails) o;
+ return rescaleAttemptId == that.rescaleAttemptId
+ && Objects.equals(rescaleUuid, that.rescaleUuid)
+ && Objects.equals(resourceRequirementsUuid,
that.resourceRequirementsUuid)
+ && Objects.equals(vertices, that.vertices)
+ && Objects.equals(slots, that.slots)
+ && Objects.equals(schedulerStates, that.schedulerStates)
+ && Objects.equals(startTimestamp, that.startTimestamp)
+ && Objects.equals(endTimestamp, that.endTimestamp)
+ && terminalState == that.terminalState
+ && triggerCause == that.triggerCause
+ && terminatedReason == that.terminatedReason;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ rescaleUuid,
+ resourceRequirementsUuid,
+ rescaleAttemptId,
+ vertices,
+ slots,
+ schedulerStates,
+ startTimestamp,
+ endTimestamp,
+ terminalState,
+ triggerCause,
+ terminatedReason);
+ }
+
+ public static JobRescaleDetails fromRescale(Rescale rescale, boolean
includeSchedulerStates) {
+ return new JobRescaleDetails(
+ rescale.getRescaleIdInfo().getRescaleUuid().toString(),
+
rescale.getRescaleIdInfo().getResourceRequirementsId().toString(),
+ rescale.getRescaleIdInfo().getRescaleAttemptId(),
+ rescale.getVertices(),
+ convertMapValues(
+ rescale.getSlots(),
+
SlotSharingGroupRescaleInfo::fromSlotSharingGroupRescale),
+ includeSchedulerStates ? rescale.getSchedulerStates() : null,
+ rescale.getStartTimestamp(),
+ rescale.getEndTimestamp(),
+ rescale.getTerminalState(),
+ rescale.getTriggerCause(),
+ rescale.getTerminatedReason());
+ }
+
+ private static <K, NV, OV> Map<K, NV> convertMapValues(
+ Map<K, OV> rawMap, Function<OV, NV> valueMapper) {
+ return rawMap == null
+ ? new HashMap<>()
+ : rawMap.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ kovEntry ->
valueMapper.apply(kovEntry.getValue())));
+ }
+
+ /** The rescale information of a {@link
org.apache.flink.runtime.jobgraph.JobVertex}. */
+ public static final class VertexParallelismRescaleInfo implements
Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_NAME_JOB_VERTEX_ID = "jobVertexId";
+ public static final String FIELD_NAME_VERTEX_NAME = "jobVertexName";
+ public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID =
"slotSharingGroupId";
+ public static final String FIELD_NAME_SLOT_SHARING_GROUP_NAME =
"slotSharingGroupName";
+ public static final String FIELD_NAME_DESIRED_PARALLELISM =
"desiredParallelism";
+ public static final String FIELD_NAME_SUFFICIENT_PARALLELISM =
"sufficientParallelism";
+ public static final String FIELD_NAME_PRE_RESCALE_PARALLELISM =
"preRescaleParallelism";
+ public static final String FIELD_NAME_POST_RESCALE_PARALLELISM =
"postRescaleParallelism";
+
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+ @JsonSerialize(using = JobVertexIDSerializer.class)
+ @JsonDeserialize(using = JobVertexIDDeserializer.class)
+ private final JobVertexID jobVertexId;
+
+ @JsonProperty(FIELD_NAME_VERTEX_NAME)
+ private String jobVertexName;
+
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+ @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+ @JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
+ private SlotSharingGroupId slotSharingGroupId;
+
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME)
+ private String slotSharingGroupName;
+
+ @JsonProperty(FIELD_NAME_DESIRED_PARALLELISM)
+ private Integer desiredParallelism;
+
+ @JsonProperty(FIELD_NAME_SUFFICIENT_PARALLELISM)
+ private Integer sufficientParallelism;
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_PRE_RESCALE_PARALLELISM)
+ private Integer preRescaleParallelism;
+
+ @Nullable
+ @JsonProperty(FIELD_NAME_POST_RESCALE_PARALLELISM)
+ private Integer postRescaleParallelism;
+
+ @JsonCreator
+ public VertexParallelismRescaleInfo(
+ @JsonSerialize(using = JobVertexIDSerializer.class)
+ @JsonDeserialize(using = JobVertexIDDeserializer.class)
+ @JsonProperty(FIELD_NAME_JOB_VERTEX_ID)
+ JobVertexID jobVertexId,
+ @JsonProperty(FIELD_NAME_VERTEX_NAME) String jobVertexName,
+ @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+ @JsonDeserialize(using =
SlotSharingGroupIDDeserializer.class)
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+ SlotSharingGroupId slotSharingGroupId,
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME) String
slotSharingGroupName,
+ @JsonProperty(FIELD_NAME_DESIRED_PARALLELISM) Integer
desiredParallelism,
+ @JsonProperty(FIELD_NAME_SUFFICIENT_PARALLELISM) Integer
sufficientParallelism,
+ @JsonProperty(FIELD_NAME_PRE_RESCALE_PARALLELISM) Integer
preRescaleParallelism,
+ @Nullable @JsonProperty(FIELD_NAME_POST_RESCALE_PARALLELISM)
+ Integer postRescaleParallelism) {
+ this.jobVertexId = jobVertexId;
+ this.jobVertexName = jobVertexName;
+ this.slotSharingGroupId = slotSharingGroupId;
+ this.slotSharingGroupName = slotSharingGroupName;
+ this.desiredParallelism = desiredParallelism;
+ this.sufficientParallelism = sufficientParallelism;
+ this.preRescaleParallelism = preRescaleParallelism;
+ this.postRescaleParallelism = postRescaleParallelism;
+ }
+
+ @JsonIgnore
+ public VertexParallelismRescaleInfo(
+ JobVertexID jobVertexId, String jobVertexName,
SlotSharingGroup slotSharingGroup) {
+ this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
+ this.jobVertexName = jobVertexName;
+ this.slotSharingGroupName =
slotSharingGroup.getSlotSharingGroupName();
+ this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
+ }
+
+ public JobVertexID getJobVertexId() {
+ return jobVertexId;
+ }
+
+ public String getJobVertexName() {
+ return jobVertexName;
+ }
+
+ public void setJobVertexName(String jobVertexName) {
+ this.jobVertexName = jobVertexName;
+ }
+
+ public SlotSharingGroupId getSlotSharingGroupId() {
+ return slotSharingGroupId;
+ }
+
+ public String getSlotSharingGroupName() {
+ return slotSharingGroupName;
+ }
+
+ @Nullable
+ public Integer getPreRescaleParallelism() {
+ return preRescaleParallelism;
+ }
+
+ public void setPreRescaleParallelism(@Nullable Integer
preRescaleParallelism) {
+ this.preRescaleParallelism = preRescaleParallelism;
+ }
+
+ public Integer getDesiredParallelism() {
+ return desiredParallelism;
+ }
+
+ public Integer getSufficientParallelism() {
+ return sufficientParallelism;
+ }
+
+ public void setRequiredParallelisms(
+ VertexParallelismInformation vertexParallelismInformation) {
+ this.sufficientParallelism =
vertexParallelismInformation.getMinParallelism();
+ this.desiredParallelism =
vertexParallelismInformation.getParallelism();
+ }
+
+ @Nullable
+ public Integer getPostRescaleParallelism() {
+ return postRescaleParallelism;
+ }
+
+ public void setPostRescaleParallelism(Integer postRescaleParallelism) {
+ this.postRescaleParallelism = postRescaleParallelism;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ VertexParallelismRescaleInfo that = (VertexParallelismRescaleInfo)
o;
+ return Objects.equals(jobVertexId, that.jobVertexId)
+ && Objects.equals(jobVertexName, that.jobVertexName)
+ && Objects.equals(slotSharingGroupId,
that.slotSharingGroupId)
+ && Objects.equals(slotSharingGroupName,
that.slotSharingGroupName)
+ && Objects.equals(preRescaleParallelism,
that.preRescaleParallelism)
+ && Objects.equals(desiredParallelism,
that.desiredParallelism)
+ && Objects.equals(sufficientParallelism,
that.sufficientParallelism)
+ && Objects.equals(postRescaleParallelism,
that.postRescaleParallelism);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jobVertexId,
+ jobVertexName,
+ slotSharingGroupId,
+ slotSharingGroupName,
+ preRescaleParallelism,
+ desiredParallelism,
+ sufficientParallelism,
+ postRescaleParallelism);
+ }
+
+ @Override
+ public String toString() {
+ return "VertexParallelismRescaleInfo{"
+ + "jobVertexId="
+ + jobVertexId
+ + ", jobVertexName='"
+ + jobVertexName
+ + '\''
+ + ", slotSharingGroupId="
+ + slotSharingGroupId
+ + ", slotSharingGroupName='"
+ + slotSharingGroupName
+ + '\''
+ + ", desiredParallelism="
+ + desiredParallelism
+ + ", sufficientParallelism="
+ + sufficientParallelism
+ + ", preRescaleParallelism="
+ + preRescaleParallelism
+ + ", postRescaleParallelism="
+ + postRescaleParallelism
+ + '}';
+ }
+ }
+
+ public static final class SlotSharingGroupRescaleInfo implements
Serializable {
+ private static final long serialVersionUID = 1L;
+ public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID =
"slotSharingGroupId";
+ public static final String FIELD_NAME_SLOT_SHARING_GROUP_NAME =
"slotSharingGroupName";
+ public static final String FIELD_NAME_REQUEST_RESOURCE_PROFILE =
"requestResourceProfile";
+ public static final String FIELD_NAME_DESIRED_SLOTS = "desiredSlots";
+ public static final String FIELD_NAME_MINIMAL_REQUIRED_SLOTS =
"minimalRequiredSlots";
+ public static final String FIELD_NAME_PRE_RESCALE_SLOTS =
"preRescaleSlots";
+ public static final String FIELD_NAME_POST_RESCALE_SLOTS =
"postRescaleSlots";
+ public static final String FIELD_NAME_ACQUIRED_RESOURCE_PROFILE =
"acquiredResourceProfile";
+
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+ @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+ @JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
+ private final SlotSharingGroupId slotSharingGroupId;
+
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME)
+ private final String slotSharingGroupName;
+
+ @JsonProperty(FIELD_NAME_REQUEST_RESOURCE_PROFILE)
+ private final ResourceProfileInfo requiredResourceProfileInfo;
+
+ @JsonProperty(FIELD_NAME_DESIRED_SLOTS)
+ private final Integer desiredSlots;
+
+ @JsonProperty(FIELD_NAME_MINIMAL_REQUIRED_SLOTS)
+ private final Integer minimalRequiredSlots;
+
+ @JsonProperty(FIELD_NAME_PRE_RESCALE_SLOTS)
+ private final Integer preRescaleSlots;
+
+ @JsonProperty(FIELD_NAME_POST_RESCALE_SLOTS)
+ private final Integer postRescaleSlots;
+
+ @JsonProperty(FIELD_NAME_ACQUIRED_RESOURCE_PROFILE)
+ private final ResourceProfileInfo acquiredResourceProfileInfo;
+
+ @JsonCreator
+ public SlotSharingGroupRescaleInfo(
+ @JsonSerialize(using = SlotSharingGroupIDSerializer.class)
+ @JsonDeserialize(using =
SlotSharingGroupIDDeserializer.class)
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
+ SlotSharingGroupId slotSharingGroupId,
+ @JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME) String
slotSharingGroupName,
+ @JsonProperty(FIELD_NAME_REQUEST_RESOURCE_PROFILE)
+ ResourceProfileInfo requiredResourceProfileInfo,
+ @JsonProperty(FIELD_NAME_DESIRED_SLOTS) Integer desiredSlots,
+ @JsonProperty(FIELD_NAME_MINIMAL_REQUIRED_SLOTS) Integer
minimalRequiredSlots,
+ @JsonProperty(FIELD_NAME_PRE_RESCALE_SLOTS) Integer
preRescaleSlots,
+ @JsonProperty(FIELD_NAME_POST_RESCALE_SLOTS) Integer
postRescaleSlots,
+ @JsonProperty(FIELD_NAME_ACQUIRED_RESOURCE_PROFILE)
+ ResourceProfileInfo acquiredResourceProfileInfo) {
+ this.slotSharingGroupId = slotSharingGroupId;
+ this.slotSharingGroupName = slotSharingGroupName;
+ this.requiredResourceProfileInfo = requiredResourceProfileInfo;
+ this.desiredSlots = desiredSlots;
+ this.minimalRequiredSlots = minimalRequiredSlots;
+ this.preRescaleSlots = preRescaleSlots;
+ this.postRescaleSlots = postRescaleSlots;
+ this.acquiredResourceProfileInfo = acquiredResourceProfileInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SlotSharingGroupRescaleInfo that = (SlotSharingGroupRescaleInfo) o;
+ return Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
+ && Objects.equals(slotSharingGroupName,
that.slotSharingGroupName)
+ && Objects.equals(requiredResourceProfileInfo,
that.requiredResourceProfileInfo)
+ && Objects.equals(desiredSlots, that.desiredSlots)
+ && Objects.equals(minimalRequiredSlots,
that.minimalRequiredSlots)
+ && Objects.equals(preRescaleSlots, that.preRescaleSlots)
+ && Objects.equals(postRescaleSlots, that.postRescaleSlots)
+ && Objects.equals(
+ acquiredResourceProfileInfo,
that.acquiredResourceProfileInfo);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ slotSharingGroupId,
+ slotSharingGroupName,
+ requiredResourceProfileInfo,
+ desiredSlots,
+ minimalRequiredSlots,
+ preRescaleSlots,
+ postRescaleSlots,
+ acquiredResourceProfileInfo);
+ }
+
+ public static SlotSharingGroupRescaleInfo fromSlotSharingGroupRescale(
+ SlotSharingGroupRescale slotSharingGroupRescale) {
+ return new SlotSharingGroupRescaleInfo(
+ slotSharingGroupRescale.getSlotSharingGroupId(),
+ slotSharingGroupRescale.getSlotSharingGroupName(),
+ ResourceProfileInfo.fromResourceProfile(
+
slotSharingGroupRescale.getRequiredResourceProfile()),
+ slotSharingGroupRescale.getDesiredSlots(),
+ slotSharingGroupRescale.getMinimalRequiredSlots(),
+ slotSharingGroupRescale.getPreRescaleSlots(),
+ slotSharingGroupRescale.getPostRescaleSlots(),
+ ResourceProfileInfo.fromResourceProfile(
+ Optional.ofNullable(
+
slotSharingGroupRescale.getAcquiredResourceProfile())
+ .orElse(ResourceProfile.UNKNOWN)));
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetailsHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetailsHeaders.java
new file mode 100644
index 00000000000..a1fb71a4a31
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetailsHeaders.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for job rescale details. */
+public class JobRescaleDetailsHeaders
+ implements RuntimeMessageHeaders<
+ EmptyRequestBody, JobRescaleDetails, JobIDRescaleIDParameters>
{
+
+ public static final JobRescaleDetailsHeaders INSTANCE = new
JobRescaleDetailsHeaders();
+
+ public static final String JOB_RESCALES_PATH =
"/jobs/:jobid/rescales/details/:rescaleuuid";
+
+ @Override
+ public Class<JobRescaleDetails> getResponseClass() {
+ return JobRescaleDetails.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Return a job rescale details.";
+ }
+
+ @Override
+ public Class<EmptyRequestBody> getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public JobIDRescaleIDParameters getUnresolvedMessageParameters() {
+ return new JobIDRescaleIDParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return JOB_RESCALES_PATH;
+ }
+
+ public static JobRescaleDetailsHeaders getInstance() {
+ return INSTANCE;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleIDPathParameter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleIDPathParameter.java
new file mode 100644
index 00000000000..bf15d0291d8
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleIDPathParameter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.rescales;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+
+/** The {@link MessagePathParameter} for {@link JobRescaleDetailsHeaders}. */
+public class JobRescaleIDPathParameter extends
MessagePathParameter<RescaleIdInfo.RescaleUUID> {
+
+ public static final String KEY = "rescaleuuid";
+
+ protected JobRescaleIDPathParameter() {
+ super(KEY);
+ }
+
+ @Override
+ protected RescaleIdInfo.RescaleUUID convertFromString(String value) throws
ConversionException {
+ try {
+ return new RescaleIdInfo.RescaleUUID(value);
+ } catch (NumberFormatException nfe) {
+ throw new ConversionException("Could not parse long from " + value
+ '.', nfe);
+ }
+ }
+
+ @Override
+ protected String convertToString(RescaleIdInfo.RescaleUUID value) {
+ return value.toString();
+ }
+
+ @Override
+ public String getDescription() {
+ return "String value that identifies a job rescale.";
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/SchedulerStateSpan.java
similarity index 70%
rename from
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/SchedulerStateSpan.java
index 1c7e1d02ec3..80b506f928e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/SchedulerStateSpan.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/SchedulerStateSpan.java
@@ -16,10 +16,13 @@
* limitations under the License.
*/
-package org.apache.flink.runtime.scheduler.adaptive.timeline;
+package org.apache.flink.runtime.rest.messages.job.rescales;
import org.apache.flink.util.Preconditions;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import javax.annotation.Nullable;
import java.io.Serializable;
@@ -33,22 +36,38 @@ import java.util.Objects;
public class SchedulerStateSpan implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final String FIELD_NAME_STATE = "state";
+ public static final String FIELD_NAME_ENTER_TIMESTAMP =
"enterTimestampInMillis";
+ public static final String FIELD_NAME_LEAVE_TIMESTAMP =
"leaveTimestampInMillis";
+ public static final String FIELD_NAME_DURATION = "durationInMillis";
+ public static final String FIELD_NAME_EXCEPTION = "stringifiedException";
+
+ @JsonProperty(FIELD_NAME_STATE)
private final String state;
- @Nullable private final Long enterTimestamp;
+ @Nullable
+ @JsonProperty(FIELD_NAME_ENTER_TIMESTAMP)
+ private final Long enterTimestamp;
- @Nullable private final Long leaveTimestamp;
+ @Nullable
+ @JsonProperty(FIELD_NAME_LEAVE_TIMESTAMP)
+ private final Long leaveTimestamp;
- @Nullable private final Long duration;
+ @Nullable
+ @JsonProperty(FIELD_NAME_DURATION)
+ private final Long duration;
- @Nullable private String stringifiedException;
+ @Nullable
+ @JsonProperty(FIELD_NAME_EXCEPTION)
+ private String stringifiedException;
+ @JsonCreator
public SchedulerStateSpan(
- String state,
- Long logicEnterMillis,
- Long logicLeaveMillis,
- Long duration,
- String stringifiedException) {
+ @JsonProperty(FIELD_NAME_STATE) String state,
+ @JsonProperty(FIELD_NAME_ENTER_TIMESTAMP) Long logicEnterMillis,
+ @JsonProperty(FIELD_NAME_LEAVE_TIMESTAMP) Long logicLeaveMillis,
+ @JsonProperty(FIELD_NAME_DURATION) Long duration,
+ @JsonProperty(FIELD_NAME_EXCEPTION) String stringifiedException) {
this.state = Preconditions.checkNotNull(state);
this.enterTimestamp = logicEnterMillis;
this.leaveTimestamp = logicLeaveMillis;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeyDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeyDeserializer.java
new file mode 100644
index 00000000000..f7371eebda9
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeyDeserializer.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.KeyDeserializer;
+
+import java.io.IOException;
+
+/** Jackson deserializer for {@link SlotSharingGroupId}. */
+public class SlotSharingGroupIDKeyDeserializer extends KeyDeserializer {
+
+ @Override
+ public Object deserializeKey(String key, DeserializationContext ctxt)
throws IOException {
+ return SlotSharingGroupId.fromHexString(key);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeySerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeySerializer.java
new file mode 100644
index 00000000000..ec7660d6db1
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SlotSharingGroupIDKeySerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.json;
+
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/** Jackson serializer for {@link SlotSharingGroupId}. */
+public class SlotSharingGroupIDKeySerializer extends
StdSerializer<SlotSharingGroupId> {
+
+ private static final long serialVersionUID = 2970050507628933522L;
+
+ public SlotSharingGroupIDKeySerializer() {
+ super(SlotSharingGroupId.class);
+ }
+
+ @Override
+ public void serialize(SlotSharingGroupId value, JsonGenerator gen,
SerializerProvider provider)
+ throws IOException {
+ gen.writeFieldName(value.toString());
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
index e37852db6d0..59d91ca5f4d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptive.timeline;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.util.BoundedFIFOQueue;
-import org.apache.flink.util.AbstractID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +54,7 @@ public class DefaultRescaleTimeline implements
RescaleTimeline {
public DefaultRescaleTimeline(
Supplier<JobInformation> jobInformationGetter, int maxHistorySize)
{
this.jobInformationGetter = jobInformationGetter;
- this.rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 0L);
+ this.rescaleIdInfo = new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 0L);
this.latestRescales = new
ConcurrentHashMap<>(TerminalState.values().length);
this.rescaleHistory = new BoundedFIFOQueue<>(maxHistorySize);
this.rescalesSummary = new RescalesSummary(maxHistorySize);
@@ -133,7 +132,7 @@ public class DefaultRescaleTimeline implements
RescaleTimeline {
private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
if (newRescaleEpoch) {
- rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L);
+ rescaleIdInfo = new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L);
} else {
rescaleIdInfo =
new RescaleIdInfo(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
index 90e0412fd0d..2d9b6381a9a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/Rescale.java
@@ -23,6 +23,8 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.State;
@@ -122,7 +124,7 @@ public class Rescale implements Serializable {
private final RescaleIdInfo rescaleIdInfo;
- private final Map<JobVertexID, VertexParallelismRescale> vertices;
+ private final Map<JobVertexID, VertexParallelismRescaleInfo> vertices;
private final Map<SlotSharingGroupId, SlotSharingGroupRescale> slots;
private final List<SchedulerStateSpan> schedulerStates;
@@ -134,7 +136,7 @@ public class Rescale implements Serializable {
@Nullable private TerminalState terminalState;
@Nullable private TerminatedReason terminatedReason;
- Rescale(RescaleIdInfo rescaleIdInfo) {
+ public Rescale(RescaleIdInfo rescaleIdInfo) {
this.rescaleIdInfo = rescaleIdInfo;
this.vertices = new HashMap<>();
this.slots = new HashMap<>();
@@ -142,7 +144,7 @@ public class Rescale implements Serializable {
}
@VisibleForTesting
- Rescale addSchedulerState(SchedulerStateSpan schedulerStateSpan) {
+ public Rescale addSchedulerState(SchedulerStateSpan schedulerStateSpan) {
if (this.isTerminated()) {
LOG.warn(
"Rescale is already terminated. The scheduler state {}
will be ignored.",
@@ -293,11 +295,11 @@ public class Rescale implements Serializable {
SlotSharingGroup slotSharingGroup =
jobInformation.getVertexInformation(jvId).getSlotSharingGroup();
String vertexName =
jobInformation.getVertexInformation(jvId).getVertexName();
- VertexParallelismRescale vertexParallelismRescale =
+ VertexParallelismRescaleInfo vertexParallelismRescale =
this.vertices.computeIfAbsent(
jvId,
jobVertexID ->
- new VertexParallelismRescale(
+ new VertexParallelismRescaleInfo(
jvId, vertexName,
slotSharingGroup));
vertexParallelismRescale.setRequiredParallelisms(entry.getValue());
}
@@ -330,11 +332,11 @@ public class Rescale implements Serializable {
lastCompletedRescale.vertices.get(jobVertexID).getPostRescaleParallelism();
JobInformation.VertexInformation vertexInformation =
jobInformation.getVertexInformation(jobVertexID);
- VertexParallelismRescale vertexParallelismRescale =
+ VertexParallelismRescaleInfo vertexParallelismRescale =
vertices.computeIfAbsent(
jobVertexID,
jobVertexId ->
- new VertexParallelismRescale(
+ new VertexParallelismRescaleInfo(
jobVertexId,
vertexInformation.getVertexName(),
vertexInformation.getSlotSharingGroup()));
@@ -363,11 +365,11 @@ public class Rescale implements Serializable {
for (JobVertexID vertexID : vertices) {
JobInformation.VertexInformation vertexInformation =
jobInformation.getVertexInformation(vertexID);
- VertexParallelismRescale vertexParallelismRescale =
+ VertexParallelismRescaleInfo vertexParallelismRescale =
this.vertices.computeIfAbsent(
vertexID,
jobVertexId ->
- new VertexParallelismRescale(
+ new VertexParallelismRescaleInfo(
jobVertexId,
vertexInformation.getVertexName(),
vertexInformation.getSlotSharingGroup()));
@@ -421,7 +423,7 @@ public class Rescale implements Serializable {
LOG.info("Updated rescale is: {}", this);
}
- public Map<JobVertexID, VertexParallelismRescale> getVertices() {
+ public Map<JobVertexID, VertexParallelismRescaleInfo> getVertices() {
return Collections.unmodifiableMap(vertices);
}
@@ -466,12 +468,12 @@ public class Rescale implements Serializable {
}
@VisibleForTesting
- Map<JobVertexID, VertexParallelismRescale> getModifiableVertices() {
+ public Map<JobVertexID, VertexParallelismRescaleInfo>
getModifiableVertices() {
return vertices;
}
@VisibleForTesting
- Map<SlotSharingGroupId, SlotSharingGroupRescale> getModifiableSlots() {
+ public Map<SlotSharingGroupId, SlotSharingGroupRescale>
getModifiableSlots() {
return slots;
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
index 94d36cc1393..1751ac9f497 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleIdInfo.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.adaptive.timeline;
import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.StringUtils;
import java.io.Serializable;
import java.util.Objects;
@@ -26,23 +27,43 @@ import java.util.Objects;
/** The class to represent the rescale id description in one resource
requirements rescale. */
public class RescaleIdInfo implements Serializable {
+ /** Class for unique rescale ID. */
+ public static class RescaleUUID extends AbstractID {
+ private static final long serialVersionUID = 1L;
+
+ public RescaleUUID() {}
+
+ public RescaleUUID(byte[] bytes) {
+ super(bytes);
+ }
+
+ public RescaleUUID(String hexString) {
+ this(StringUtils.hexStringToByte(hexString));
+ }
+ }
+
+ /** Class for unique resource requirements ID. */
+ public static class ResourceRequirementsID extends AbstractID {
+ private static final long serialVersionUID = 1L;
+ }
+
private static final long serialVersionUID = 1L;
- private final AbstractID rescaleUuid;
- private final AbstractID resourceRequirementsId;
+ private final RescaleUUID rescaleUuid;
+ private final ResourceRequirementsID resourceRequirementsId;
private final long rescaleAttemptId;
- public RescaleIdInfo(AbstractID resourceRequirementsId, Long
rescaleAttemptId) {
+ public RescaleIdInfo(ResourceRequirementsID resourceRequirementsId, Long
rescaleAttemptId) {
this.resourceRequirementsId = resourceRequirementsId;
this.rescaleAttemptId = rescaleAttemptId;
- this.rescaleUuid = new AbstractID();
+ this.rescaleUuid = new RescaleUUID();
}
- public AbstractID getRescaleUuid() {
+ public RescaleUUID getRescaleUuid() {
return rescaleUuid;
}
- public AbstractID getResourceRequirementsId() {
+ public ResourceRequirementsID getResourceRequirementsId() {
return resourceRequirementsId;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
index 316d34ef634..f8697fe473f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
@@ -19,20 +19,33 @@
package org.apache.flink.runtime.scheduler.adaptive.timeline;
import org.apache.flink.runtime.util.stats.StatsSummarySnapshot;
+import org.apache.flink.util.AbstractID;
+
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
public class RescalesStatsSnapshot implements Serializable {
private static final long serialVersionUID = 1L;
private final List<Rescale> rescaleHistory;
+ private final Map<AbstractID, Rescale> idToRescaleMap;
private final RescalesSummarySnapshot rescalesSummarySnapshot;
public RescalesStatsSnapshot(
List<Rescale> rescaleHistory, RescalesSummarySnapshot
rescalesSummarySnapshot) {
this.rescaleHistory = rescaleHistory;
+ this.idToRescaleMap =
+ rescaleHistory.stream()
+ .collect(
+ Collectors.toMap(
+ r ->
r.getRescaleIdInfo().getRescaleUuid(),
+ Function.identity()));
this.rescalesSummarySnapshot = rescalesSummarySnapshot;
}
@@ -40,6 +53,10 @@ public class RescalesStatsSnapshot implements Serializable {
return rescaleHistory;
}
+ public @Nullable Rescale getRescale(AbstractID rescaleId) {
+ return idToRescaleMap.get(rescaleId);
+ }
+
public RescalesSummarySnapshot getRescalesSummarySnapshot() {
return rescalesSummarySnapshot;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
deleted file mode 100644
index 3ea9476e79a..00000000000
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/VertexParallelismRescale.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.scheduler.adaptive.timeline;
-
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/** The rescale information of a {@link
org.apache.flink.runtime.jobgraph.JobVertex}. */
-public class VertexParallelismRescale implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final JobVertexID jobVertexId;
- private String jobVertexName;
- private SlotSharingGroupId slotSharingGroupId;
- private String slotSharingGroupName;
- private Integer desiredParallelism;
- private Integer sufficientParallelism;
-
- @Nullable private Integer preRescaleParallelism;
-
- @Nullable private Integer postRescaleParallelism;
-
- public VertexParallelismRescale(
- JobVertexID jobVertexId, String jobVertexName, SlotSharingGroup
slotSharingGroup) {
- this.jobVertexId = Preconditions.checkNotNull(jobVertexId);
- this.jobVertexName = jobVertexName;
- this.slotSharingGroupName = slotSharingGroup.getSlotSharingGroupName();
- this.slotSharingGroupId = slotSharingGroup.getSlotSharingGroupId();
- }
-
- public JobVertexID getJobVertexId() {
- return jobVertexId;
- }
-
- public String getJobVertexName() {
- return jobVertexName;
- }
-
- public void setJobVertexName(String jobVertexName) {
- this.jobVertexName = jobVertexName;
- }
-
- public SlotSharingGroupId getSlotSharingGroupId() {
- return slotSharingGroupId;
- }
-
- public String getSlotSharingGroupName() {
- return slotSharingGroupName;
- }
-
- @Nullable
- public Integer getPreRescaleParallelism() {
- return preRescaleParallelism;
- }
-
- public void setPreRescaleParallelism(@Nullable Integer
preRescaleParallelism) {
- this.preRescaleParallelism = preRescaleParallelism;
- }
-
- public Integer getDesiredParallelism() {
- return desiredParallelism;
- }
-
- public Integer getSufficientParallelism() {
- return sufficientParallelism;
- }
-
- public void setRequiredParallelisms(VertexParallelismInformation
vertexParallelismInformation) {
- this.sufficientParallelism =
vertexParallelismInformation.getMinParallelism();
- this.desiredParallelism =
vertexParallelismInformation.getParallelism();
- }
-
- @Nullable
- public Integer getPostRescaleParallelism() {
- return postRescaleParallelism;
- }
-
- public void setPostRescaleParallelism(Integer postRescaleParallelism) {
- this.postRescaleParallelism = postRescaleParallelism;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- VertexParallelismRescale that = (VertexParallelismRescale) o;
- return Objects.equals(jobVertexId, that.jobVertexId)
- && Objects.equals(jobVertexName, that.jobVertexName)
- && Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
- && Objects.equals(slotSharingGroupName,
that.slotSharingGroupName)
- && Objects.equals(preRescaleParallelism,
that.preRescaleParallelism)
- && Objects.equals(desiredParallelism, that.desiredParallelism)
- && Objects.equals(sufficientParallelism,
that.sufficientParallelism)
- && Objects.equals(postRescaleParallelism,
that.postRescaleParallelism);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(
- jobVertexId,
- jobVertexName,
- slotSharingGroupId,
- slotSharingGroupName,
- preRescaleParallelism,
- desiredParallelism,
- sufficientParallelism,
- postRescaleParallelism);
- }
-
- @Override
- public String toString() {
- return "VertexParallelismRescale{"
- + "jobVertexId="
- + jobVertexId
- + ", jobVertexName='"
- + jobVertexName
- + '\''
- + ", slotSharingGroupId="
- + slotSharingGroupId
- + ", slotSharingGroupName='"
- + slotSharingGroupName
- + '\''
- + ", desiredParallelism="
- + desiredParallelism
- + ", sufficientParallelism="
- + sufficientParallelism
- + ", preRescaleParallelism="
- + preRescaleParallelism
- + ", postRescaleParallelism="
- + postRescaleParallelism
- + '}';
- }
-}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 211b8749425..8bf45671feb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -97,6 +97,7 @@ import
org.apache.flink.runtime.rest.handler.job.metrics.JobVertexWatermarksHand
import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
import
org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler;
import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler;
+import
org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler;
import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers;
import
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers;
import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers;
@@ -164,6 +165,7 @@ import
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumul
import
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders;
import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
@@ -1212,6 +1214,17 @@ public class WebMonitorEndpoint<T extends
RestfulGateway> extends RestServerEndp
handlers.add(
Tuple2.of(jobRescaleConfigHandler.getMessageHeaders(),
jobRescaleConfigHandler));
+ final JobRescaleDetailsHandler jobRescaleDetailsHandler =
+ new JobRescaleDetailsHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ JobRescaleDetailsHeaders.getInstance(),
+ executionGraphCache,
+ executor);
+ handlers.add(
+ Tuple2.of(jobRescaleDetailsHandler.getMessageHeaders(),
jobRescaleDetailsHandler));
+
handlers.stream()
.map(tuple -> tuple.f1)
.filter(handler -> handler instanceof JsonArchivist)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
new file mode 100644
index 00000000000..3ecd4e46038
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.rescales;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobIDRescaleIDParameters;
+import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleIDPathParameter;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary;
+import
org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupRescale;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.fromRescale;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link JobRescaleDetailsHandler}. */
+class JobRescaleDetailsHandlerTest {
+
+ private final JobRescaleDetailsHandler testInstance =
+ new JobRescaleDetailsHandler(
+ CompletableFuture::new,
+ TestingUtils.TIMEOUT,
+ Map.of(),
+ JobRescaleDetailsHeaders.getInstance(),
+ new DefaultExecutionGraphCache(TestingUtils.TIMEOUT,
TestingUtils.TIMEOUT),
+ Executors.directExecutor());
+
+ @Test
+ void testUnNormalCases() throws HandlerRequestException,
RestHandlerException {
+ // Test for adaptive scheduler rescales was not enabled for job.
+ final ExecutionGraphInfo
executionGraphInfoWithNullRescalesStatsSnapshot =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
List.of(), null);
+ final HandlerRequest<EmptyRequestBody> request =
+ createRequest(
+
executionGraphInfoWithNullRescalesStatsSnapshot.getJobId(),
+ new RescaleIdInfo.RescaleUUID());
+ assertThatThrownBy(
+ () ->
+ testInstance.handleRequest(
+ request,
executionGraphInfoWithNullRescalesStatsSnapshot))
+ .isInstanceOf(RestHandlerException.class);
+
+ // Test for that case could not find rescale details for the specified
rescale uuid.
+ final ExecutionGraphInfo
executionGraphInfoWithEmptyRescalesStatsSnapshot =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
+ List.of(),
+ JobManagerOptions.SchedulerType.Adaptive,
+ null,
+ RescalesStatsSnapshot.emptySnapshot());
+ assertThatThrownBy(
+ () ->
+ testInstance.handleRequest(
+ request,
executionGraphInfoWithEmptyRescalesStatsSnapshot))
+ .isInstanceOf(RestHandlerException.class);
+ }
+
+ @Test
+ void testRequestNormalJobRescaleStatisticsDetails()
+ throws HandlerRequestException, RestHandlerException {
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L))
+ .setStartTimestamp(1L)
+ .setEndTimestamp(100L)
+ .setTriggerCause(TriggerCause.INITIAL_SCHEDULE)
+ .setStringifiedException("mocked exception")
+ .addSchedulerState(new SchedulerStateSpan("Created",
1L, 5L, 4L, null))
+ .setTerminatedReason(TerminatedReason.SUCCEEDED);
+
+ JobVertexID jobVertexID = new JobVertexID();
+ SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
+
+ SlotSharingGroupRescale slotSharingGroupRescale =
+ new SlotSharingGroupRescale(slotSharingGroup);
+ slotSharingGroupRescale.setPostRescaleSlots(2);
+ slotSharingGroupRescale.setPreRescaleSlots(1);
+ slotSharingGroupRescale.setDesiredSlots(5);
+ slotSharingGroupRescale.setMinimalRequiredSlots(1);
+
slotSharingGroupRescale.setAcquiredResourceProfile(ResourceProfile.ZERO);
+
+ VertexParallelismRescaleInfo vertexParallelismRescaleInfo =
+ new VertexParallelismRescaleInfo(
+ jobVertexID,
+ "jvName",
+ slotSharingGroup.getSlotSharingGroupId(),
+ "default",
+ 5,
+ 1,
+ 1,
+ 2);
+
+ rescale.getModifiableSlots()
+ .put(slotSharingGroup.getSlotSharingGroupId(),
slotSharingGroupRescale);
+ rescale.getModifiableVertices().put(jobVertexID,
vertexParallelismRescaleInfo);
+
+ RescalesSummary rescalesSummary = new RescalesSummary(2);
+ rescalesSummary.addTerminated(rescale);
+
+ final ExecutionGraphInfo executionGraphInfo =
+ new ExecutionGraphInfo(
+ new ArchivedExecutionGraphBuilder().build(),
+ List.of(),
+ null,
+ null,
+ new RescalesStatsSnapshot(
+ List.of(rescale),
rescalesSummary.createSnapshot()));
+ final HandlerRequest<EmptyRequestBody> request =
+ createRequest(
+ executionGraphInfo.getJobId(),
rescale.getRescaleIdInfo().getRescaleUuid());
+ JobRescaleDetails jobRescaleDetails =
+ testInstance.handleRequest(request, executionGraphInfo);
+ assertThat(jobRescaleDetails).isEqualTo(fromRescale(rescale, true));
+ }
+
+ private static HandlerRequest<EmptyRequestBody> createRequest(
+ JobID jobId, AbstractID rescaleUuid) throws
HandlerRequestException {
+ final Map<String, String> pathParameters = new HashMap<>();
+ pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+ pathParameters.put(JobRescaleIDPathParameter.KEY,
rescaleUuid.toString());
+
+ return HandlerRequest.resolveParametersAndCreate(
+ EmptyRequestBody.getInstance(),
+ new JobIDRescaleIDParameters(),
+ pathParameters,
+ new HashMap<>(),
+ List.of());
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
index 3d0775860b5..4664811cb57 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
@@ -32,7 +34,6 @@ import
org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
-import org.apache.flink.util.AbstractID;
import org.apache.flink.util.ExceptionUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -160,7 +161,8 @@ class RescaleTest {
@Test
void testAddSchedulerState() {
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
// Test for add a state span into a terminated rescale.
rescale.setTerminatedReason(TerminatedReason.SUCCEEDED);
@@ -168,21 +170,21 @@ class RescaleTest {
assertThat(rescale.getSchedulerStates()).isEmpty();
// Test for add a state span into a non-terminated rescale.
- rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale = new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null,
null));
assertThat(rescale.getSchedulerStates()).hasSize(1);
// Test the correctness of throwable processing.
String stringifiedException1 =
ExceptionUtils.stringifyException(new
RuntimeException("exception1"));
- rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale = new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setStringifiedException(stringifiedException1);
rescale.addSchedulerState(new SchedulerStateSpan("", null, null, null,
null));
SchedulerStateSpan schedulerStateSpan =
rescale.getSchedulerStates().get(0);
assertThat(schedulerStateSpan.getStringifiedException()).isEqualTo(stringifiedException1);
assertThat(rescale.getStringifiedException()).isNull();
- rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale = new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.addSchedulerState(
new SchedulerStateSpan("", null, null, null,
stringifiedException1));
schedulerStateSpan = rescale.getSchedulerStates().get(0);
@@ -190,7 +192,7 @@ class RescaleTest {
// Test the correctness of the end time of span auto-fulfill.
State stateWithoutEndTimestamp = new TestingAdaptiveSchedulerState(2L,
null);
- rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ rescale = new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setStringifiedException(stringifiedException1);
rescale.addSchedulerState(stateWithoutEndTimestamp, null);
schedulerStateSpan = rescale.getSchedulerStates().get(0);
@@ -200,7 +202,8 @@ class RescaleTest {
@Test
void testSetDesiredSlots() {
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setDesiredSlots(jobInformation);
Map<SlotSharingGroupId, SlotSharingGroupRescale> slots =
rescale.getSlots();
assertThat(slots).hasSize(2);
@@ -223,28 +226,29 @@ class RescaleTest {
@Test
void testSetDesiredVertexParallelism() {
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setDesiredVertexParallelism(jobInformation);
- Map<JobVertexID, VertexParallelismRescale> vertices =
rescale.getVertices();
+ Map<JobVertexID, VertexParallelismRescaleInfo> vertices =
rescale.getVertices();
assertThat(vertices).hasSize(4);
assertThat(vertices.keySet())
.isEqualTo(
vertices.values().stream()
- .map(VertexParallelismRescale::getJobVertexId)
+
.map(VertexParallelismRescaleInfo::getJobVertexId)
.collect(Collectors.toSet()))
.hasSameElementsAs(
jobVertices.stream().map(JobVertex::getID).collect(Collectors.toSet()));
assertThat(
vertices.values().stream()
-
.map(VertexParallelismRescale::getJobVertexName)
+
.map(VertexParallelismRescaleInfo::getJobVertexName)
.collect(Collectors.toSet()))
.hasSameElementsAs(
jobVertices.stream().map(JobVertex::getName).collect(Collectors.toSet()));
assertThat(
vertices.values().stream()
-
.map(VertexParallelismRescale::getSlotSharingGroupId)
+
.map(VertexParallelismRescaleInfo::getSlotSharingGroupId)
.collect(Collectors.toSet()))
.hasSameElementsAs(
jobVertices.stream()
@@ -253,7 +257,7 @@ class RescaleTest {
assertThat(
vertices.values().stream()
-
.map(VertexParallelismRescale::getSlotSharingGroupName)
+
.map(VertexParallelismRescaleInfo::getSlotSharingGroupName)
.collect(Collectors.toSet()))
.hasSameElementsAs(
jobVertices.stream()
@@ -262,19 +266,20 @@ class RescaleTest {
assertThat(
vertices.values().stream()
-
.map(VertexParallelismRescale::getDesiredParallelism)
+
.map(VertexParallelismRescaleInfo::getDesiredParallelism)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder(2, 3, 4, 5);
assertThat(
vertices.values().stream()
-
.map(VertexParallelismRescale::getSufficientParallelism)
+
.map(VertexParallelismRescaleInfo::getSufficientParallelism)
.collect(Collectors.toList()))
.containsExactlyInAnyOrder(1, 2, 3, 4);
}
@Test
void testSetMinimalRequiredSlots() {
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setMinimalRequiredSlots(jobInformation);
Map<SlotSharingGroupId, SlotSharingGroupRescale> slots =
rescale.getSlots();
assertThat(slots).hasSize(2);
@@ -292,23 +297,25 @@ class RescaleTest {
@Test
void testSetPreRescaleSlotsAndParallelisms() {
// Test for null last rescale.
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 2L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 2L));
rescale.setPreRescaleSlotsAndParallelisms(jobInformation, null);
assertThat(rescale.getSlots()).isEmpty();
assertThat(rescale.getVertices()).isEmpty();
// Test for non-null last rescale.
// Prepare the last completed rescale.
- Rescale lastCompletedRescale = new Rescale(new RescaleIdInfo(new
AbstractID(), 1L));
- Map<JobVertexID, VertexParallelismRescale> lastRescaleVertices =
+ Rescale lastCompletedRescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
+ Map<JobVertexID, VertexParallelismRescaleInfo> lastRescaleVertices =
lastCompletedRescale.getModifiableVertices();
jobVertices.forEach(
jobVertex -> {
- VertexParallelismRescale vertexParallelismRescale =
+ VertexParallelismRescaleInfo vertexParallelismRescale =
lastRescaleVertices.computeIfAbsent(
jobVertex.getID(),
ignored ->
- new VertexParallelismRescale(
+ new VertexParallelismRescaleInfo(
jobVertex.getID(),
jobInformation
.getVertexInformation(jobVertex.getID())
@@ -340,7 +347,7 @@ class RescaleTest {
assertThat(
rescale.getVertices().values().stream()
-
.map(VertexParallelismRescale::getPreRescaleParallelism)
+
.map(VertexParallelismRescaleInfo::getPreRescaleParallelism)
.collect(Collectors.toSet()))
.containsExactly(4);
}
@@ -353,11 +360,12 @@ class RescaleTest {
vertex -> {
parallelismForVertices.put(vertex.getID(), 2);
});
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setPostRescaleVertexParallelism(jobInformation,
postVertexParallelism);
assertThat(
rescale.getVertices().values().stream()
-
.map(VertexParallelismRescale::getPostRescaleParallelism)
+
.map(VertexParallelismRescaleInfo::getPostRescaleParallelism)
.collect(Collectors.toSet()))
.containsExactly(2);
}
@@ -398,7 +406,8 @@ class RescaleTest {
new SlotSharingSlotAllocator.ExecutionSlotSharingGroup(
slotSharingGroupB, null)));
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setPostRescaleSlots(slotAssignments);
assertThat(
rescale.getSlots().values().stream()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
index 09d31c00d76..fc105d800dc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
@@ -32,6 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
+import
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo;
+import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
@@ -454,10 +456,10 @@ class RescaleTimelineITCase {
assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
slotSharingGroupRescale);
});
- Map<JobVertexID, VertexParallelismRescale> vertices =
rescale.getVertices();
+ Map<JobVertexID, VertexParallelismRescaleInfo> vertices =
rescale.getVertices();
assertThat(vertices.values())
.allSatisfy(
- (Consumer<VertexParallelismRescale>)
+ (Consumer<VertexParallelismRescaleInfo>)
vpr -> {
assertThat(vpr.getPreRescaleParallelism()).isNull();
assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
@@ -506,10 +508,10 @@ class RescaleTimelineITCase {
assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
slotSharingGroupRescale);
});
- Map<JobVertexID, VertexParallelismRescale> vertices =
rescale.getVertices();
+ Map<JobVertexID, VertexParallelismRescaleInfo> vertices =
rescale.getVertices();
assertThat(vertices.values())
.allSatisfy(
- (Consumer<VertexParallelismRescale>)
+ (Consumer<VertexParallelismRescaleInfo>)
vpr -> {
assertThat(vpr.getPreRescaleParallelism())
.isNotNull();
@@ -634,7 +636,7 @@ class RescaleTimelineITCase {
}
private void assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
- VertexParallelismRescale vpr) {
+ VertexParallelismRescaleInfo vpr) {
assertThat(vpr.getDesiredParallelism()).isNotNull();
assertThat(vpr.getPostRescaleParallelism()).isNotNull();
assertThat(vpr.getSlotSharingGroupName()).isNotNull();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
index d773f0ac165..6be786d2ee2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.scheduler.adaptive.timeline;
import org.apache.flink.runtime.util.stats.StatsSummary;
-import org.apache.flink.util.AbstractID;
import org.junit.jupiter.api.Test;
@@ -29,7 +28,8 @@ import static org.assertj.core.api.Assertions.assertThat;
class RescalesSummaryTest {
private Rescale getRescale() {
- Rescale rescale = new Rescale(new RescaleIdInfo(new AbstractID(), 1L));
+ Rescale rescale =
+ new Rescale(new RescaleIdInfo(new
RescaleIdInfo.ResourceRequirementsID(), 1L));
rescale.setStartTimestamp(1L);
rescale.setEndTimestamp(2L);
return rescale;