This is an automated email from the ASF dual-hosted git repository. panyuepeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 301eb359c72f89382313e1cdb1f5d1c10a1c7834 Author: Yuepeng Pan <[email protected]> AuthorDate: Thu Apr 9 21:54:11 2026 +0800 [FLINK-38894][runtime/rest] Introduce the /jobs/:jobid/rescales/history endpoint in the REST API Co-authored-by: XComp <[email protected]> Co-authored-by: WeiZhong94 <[email protected]> Co-authored-by: mateczagany <[email protected]> Co-authored-by: ferenc-csaky <[email protected]> --- .../shortcodes/generated/rest_v1_dispatcher.html | 192 ++++++ docs/static/generated/rest_v1_dispatcher.yml | 24 + .../src/test/resources/rest_api_v1.snapshot | 686 +++++++++++++-------- .../job/rescales/JobRescalesHistoryHandler.java | 115 ++++ .../messages/job/rescales/JobRescaleDetails.java | 15 +- .../messages/job/rescales/JobRescalesHistory.java | 67 ++ .../job/rescales/JobRescalesHistoryHeaders.java | 72 +++ .../adaptive/timeline/DefaultRescaleTimeline.java | 5 +- .../adaptive/timeline/RescalesStatsSnapshot.java | 15 +- .../runtime/webmonitor/WebMonitorEndpoint.java | 14 + .../job/rescales/JobRescaleDetailsHandlerTest.java | 7 +- ...est.java => JobRescalesHistoryHandlerTest.java} | 81 +-- 12 files changed, 967 insertions(+), 326 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 10fba048344..b0e0cf4d0a4 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -3833,6 +3833,198 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa </tr> </tbody> </table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/rescales/history</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Return job rescales history.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Request</summary> + <pre><code>{}</code></pre> + </label> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Response</summary> + <pre><code>{ + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails", + "properties" : { + "endTimestampInMillis" : { + "type" : "integer" + }, + "rescaleAttemptId" : { + "type" : "integer" + }, + "rescaleUuid" : { + "type" : "string" + }, + "resourceRequirementsUuid" : { + "type" : "string" + }, + "schedulerStates" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan", + "properties" : { + "durationInMillis" : { + "type" : "integer" + }, + "enterTimestampInMillis" : { + "type" : "integer" + }, + "leaveTimestampInMillis" : { + "type" : "integer" + }, + "state" : { + "type" : "string" + }, + "stringifiedException" : { + "type" : "string" + } + } + } + }, + "slots" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo", + "properties" : { + "acquiredResourceProfile" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo" + }, + "desiredSlots" : { + "type" : "integer" + }, + "minimalRequiredSlots" : { + "type" : "integer" + }, + "postRescaleSlots" : { + "type" : "integer" + }, + "preRescaleSlots" : { + "type" : "integer" + }, + "requestResourceProfile" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo", + "properties" : { + "cpuCores" : { + "type" : "number" + }, + "extendedResources" : { + "type" : "object", + "additionalProperties" : { + "type" : "number" + } + }, + "managedMemory" : { + "type" : "integer" + }, + "networkMemory" : { + "type" : "integer" + }, + "taskHeapMemory" : { + "type" : "integer" + }, + "taskOffHeapMemory" : { + "type" : "integer" + } + } + }, + "slotSharingGroupId" : { + "type" : "any" + }, + "slotSharingGroupName" : { + "type" : "string" + } + } + } + }, + "startTimestampInMillis" : { + "type" : "integer" + }, + "terminalState" : { + "type" : "string", + "enum" : [ "COMPLETED", "FAILED", "IGNORED" ] + }, + "terminatedReason" : { + "type" : "string", + "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", "RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", "JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ] + }, + "triggerCause" : { + "type" : "string", + "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", "NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ] + }, + "vertices" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo", + "properties" : { + "desiredParallelism" : { + "type" : "integer" + }, + "jobVertexId" : { + "type" : "any" + }, + "jobVertexName" : { + "type" : "string" + }, + "postRescaleParallelism" : { + "type" : "integer" + }, + "preRescaleParallelism" : { + "type" : "integer" + }, + "slotSharingGroupId" : { + "type" : "any" + }, + "slotSharingGroupName" : { + "type" : "string" + }, + "sufficientParallelism" : { + "type" : "integer" + } + } + } + } + } + } +}</code></pre> + </label> + </td> + </tr> + </tbody> +</table> <table class="rest-api table table-bordered"> <tbody> <tr> diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 43fa4b3fe28..80fed9473fc 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -958,6 +958,26 @@ paths: application/json: schema: $ref: "#/components/schemas/JobRescaleDetails" + /jobs/{jobid}/rescales/history: + get: + description: Return job rescales history. + operationId: getJobRescalesHistory + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: "#/components/schemas/JobID" + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/JobRescaleDetails" /jobs/{jobid}/rescaling: patch: description: Triggers the rescaling of a job. This async operation would return @@ -2826,6 +2846,10 @@ components: type: object additionalProperties: $ref: "#/components/schemas/VertexParallelismRescaleInfo" + JobRescalesHistory: + type: array + items: + $ref: "#/components/schemas/JobRescaleDetails" JobResourceRequirementsBody: type: object additionalProperties: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index b4a16c9cd74..5a3d9bc9251 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -169,38 +169,6 @@ "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody" } - }, { - "url" : "/applications/:applicationid/jobmanager/config", - "method" : "GET", - "status-code" : "200 OK", - "file-upload" : false, - "path-parameters" : { - "pathParameters" : [ { - "key" : "applicationid" - } ] - }, - "query-parameters" : { - "queryParameters" : [ ] - }, - "request" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" - }, - "response" : { - "type" : "array", - "items" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry", - "properties" : { - "key" : { - "type" : "string" - }, - "value" : { - "type" : "string" - } - } - } - } }, { "url" : "/applications/:applicationid/exceptions", "method" : "GET", @@ -251,6 +219,38 @@ } } } + }, { + "url" : "/applications/:applicationid/jobmanager/config", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "applicationid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry", + "properties" : { + "key" : { + "type" : "string" + }, + "value" : { + "type" : "string" + } + } + } + } }, { "url" : "/cluster", "method" : "DELETE", @@ -2834,6 +2834,392 @@ } } } + }, { + "url" : "/jobs/:jobid/rescales/config", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo", + "properties" : { + "rescaleHistoryMax" : { + "type" : "integer" + }, + "schedulerExecutionMode" : { + "type" : "string", + "enum" : [ "REACTIVE" ] + }, + "submissionResourceWaitTimeoutInMillis" : { + "type" : "integer" + }, + "submissionResourceStabilizationTimeoutInMillis" : { + "type" : "integer" + }, + "slotIdleTimeoutInMillis" : { + "type" : "integer" + }, + "executingCooldownTimeoutInMillis" : { + "type" : "integer" + }, + "executingResourceStabilizationTimeoutInMillis" : { + "type" : "integer" + }, + "maximumDelayForTriggeringRescaleInMillis" : { + "type" : "integer" + }, + "rescaleOnFailedCheckpointCount" : { + "type" : "integer" + } + } + } + }, { + "url" : "/jobs/:jobid/rescales/details/:rescaleuuid", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + }, { + "key" : "rescaleuuid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails", + "properties" : { + "rescaleUuid" : { + "type" : "string" + }, + "resourceRequirementsUuid" : { + "type" : "string" + }, + "rescaleAttemptId" : { + "type" : "integer" + }, + "vertices" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo", + "properties" : { + "jobVertexId" : { + "type" : "any" + }, + "jobVertexName" : { + "type" : "string" + }, + "slotSharingGroupId" : { + "type" : "any" + }, + "slotSharingGroupName" : { + "type" : "string" + }, + "desiredParallelism" : { + "type" : "integer" + }, + "sufficientParallelism" : { + "type" : "integer" + }, + "preRescaleParallelism" : { + "type" : "integer" + }, + "postRescaleParallelism" : { + "type" : "integer" + } + } + } + }, + "slots" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo", + "properties" : { + "slotSharingGroupId" : { + "type" : "any" + }, + "slotSharingGroupName" : { + "type" : "string" + }, + "requestResourceProfile" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo", + "properties" : { + "cpuCores" : { + "type" : "number" + }, + "taskHeapMemory" : { + "type" : "integer" + }, + "taskOffHeapMemory" : { + "type" : "integer" + }, + "managedMemory" : { + "type" : "integer" + }, + "networkMemory" : { + "type" : "integer" + }, + "extendedResources" : { + "type" : "object", + "additionalProperties" : { + "type" : "number" + } + } + } + }, + "desiredSlots" : { + "type" : "integer" + }, + "minimalRequiredSlots" : { + "type" : "integer" + }, + "preRescaleSlots" : { + "type" : "integer" + }, + "postRescaleSlots" : { + "type" : "integer" + }, + "acquiredResourceProfile" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo" + } + } + } + }, + "schedulerStates" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan", + "properties" : { + "state" : { + "type" : "string" + }, + "enterTimestampInMillis" : { + "type" : "integer" + }, + "leaveTimestampInMillis" : { + "type" : "integer" + }, + "durationInMillis" : { + "type" : "integer" + }, + "stringifiedException" : { + "type" : "string" + } + } + } + }, + "startTimestampInMillis" : { + "type" : "integer" + }, + "endTimestampInMillis" : { + "type" : "integer" + }, + "terminalState" : { + "type" : "string", + "enum" : [ "COMPLETED", "FAILED", "IGNORED" ] + }, + "triggerCause" : { + "type" : "string", + "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", "NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ] + }, + "terminatedReason" : { + "type" : "string", + "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", "RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", "JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ] + } + } + } + }, { + "url" : "/jobs/:jobid/rescales/history", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails", + "properties" : { + "rescaleUuid" : { + "type" : "string" + }, + "resourceRequirementsUuid" : { + "type" : "string" + }, + "rescaleAttemptId" : { + "type" : "integer" + }, + "vertices" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo", + "properties" : { + "jobVertexId" : { + "type" : "any" + }, + "jobVertexName" : { + "type" : "string" + }, + "slotSharingGroupId" : { + "type" : "any" + }, + "slotSharingGroupName" : { + "type" : "string" + }, + "desiredParallelism" : { + "type" : "integer" + }, + "sufficientParallelism" : { + "type" : "integer" + }, + "preRescaleParallelism" : { + "type" : "integer" + }, + "postRescaleParallelism" : { + "type" : "integer" + } + } + } + }, + "slots" : { + "type" : "object", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo", + "properties" : { + "slotSharingGroupId" : { + "type" : "any" + }, + "slotSharingGroupName" : { + "type" : "string" + }, + "requestResourceProfile" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo", + "properties" : { + "cpuCores" : { + "type" : "number" + }, + "taskHeapMemory" : { + "type" : "integer" + }, + "taskOffHeapMemory" : { + "type" : "integer" + }, + "managedMemory" : { + "type" : "integer" + }, + "networkMemory" : { + "type" : "integer" + }, + "extendedResources" : { + "type" : "object", + "additionalProperties" : { + "type" : "number" + } + } + } + }, + "desiredSlots" : { + "type" : "integer" + }, + "minimalRequiredSlots" : { + "type" : "integer" + }, + "preRescaleSlots" : { + "type" : "integer" + }, + "postRescaleSlots" : { + "type" : "integer" + }, + "acquiredResourceProfile" : { + "type" : "object", + "$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo" + } + } + } + }, + "schedulerStates" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan", + "properties" : { + "state" : { + "type" : "string" + }, + "enterTimestampInMillis" : { + "type" : "integer" + }, + "leaveTimestampInMillis" : { + "type" : "integer" + }, + "durationInMillis" : { + "type" : "integer" + }, + "stringifiedException" : { + "type" : "string" + } + } + } + }, + "startTimestampInMillis" : { + "type" : "integer" + }, + "endTimestampInMillis" : { + "type" : "integer" + }, + "terminalState" : { + "type" : "string", + "enum" : [ "COMPLETED", "FAILED", "IGNORED" ] + }, + "triggerCause" : { + "type" : "string", + "enum" : [ "INITIAL_SCHEDULE", "UPDATE_REQUIREMENT", "NEW_RESOURCE_AVAILABLE", "RECOVERABLE_FAILOVER" ] + }, + "terminatedReason" : { + "type" : "string", + "enum" : [ "SUCCEEDED", "EXCEPTION_OCCURRED", "RESOURCE_REQUIREMENTS_UPDATED", "NO_RESOURCES_OR_PARALLELISMS_CHANGE", "JOB_FINISHED", "JOB_FAILED", "JOB_CANCELED", "JOB_FAILOVER_RESTARTING" ] + } + } + } + } }, { "url" : "/jobs/:jobid/rescaling", "method" : "PATCH", @@ -4781,239 +5167,5 @@ } } } - }, { - "url" : "/jobs/:jobid/rescales/config", - "method" : "GET", - "status-code" : "200 OK", - "file-upload" : false, - "path-parameters" : { - "pathParameters" : [ { - "key" : "jobid" - } ] - }, - "query-parameters" : { - "queryParameters" : [ ] - }, - "request" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" - }, - "response" : { - "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleConfigInfo", - "properties" : { - "executingCooldownTimeoutInMillis" : { - "type" : "integer" - }, - "executingResourceStabilizationTimeoutInMillis" : { - "type" : "integer" - }, - "maximumDelayForTriggeringRescaleInMillis" : { - "type" : "integer" - }, - "rescaleHistoryMax" : { - "type" : "integer" - }, - "rescaleOnFailedCheckpointCount" : { - "type" : "integer" - }, - "schedulerExecutionMode" : { - "type" : "string", - "enum" : [ "REACTIVE" ] - }, - "slotIdleTimeoutInMillis" : { - "type" : "integer" - }, - "submissionResourceStabilizationTimeoutInMillis" : { - "type" : "integer" - }, - "submissionResourceWaitTimeoutInMillis" : { - "type" : "integer" - } - } - } - }, { - "url": "/jobs/:jobid/rescales/details/:rescaleuuid", - "method": "GET", - "status-code": "200 OK", - "file-upload": false, - "path-parameters": { - "pathParameters": [ - { - "key": "jobid" - }, - { - "key": "rescaleuuid" - } - ] - }, - "query-parameters": { - "queryParameters": [] - }, - "request": { - "type": "object" - }, - "response": { - "type": "object", - "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails", - "properties": { - "endTimestampInMillis": { - "type": "integer" - }, - "rescaleAttemptId": { - "type": "integer" - }, - "rescaleUuid": { - "type": "string" - }, - "resourceRequirementsUuid": { - "type": "string" - }, - "schedulerStates": { - "type": "array", - "items": { - "type": "object", - "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:SchedulerStateSpan", - "properties": { - "durationInMillis": { - "type": "integer" - }, - "enterTimestampInMillis": { - "type": "integer" - }, - "leaveTimestampInMillis": { - "type": "integer" - }, - "state": { - "type": "string" - }, - "stringifiedException": { - "type": "string" - } - } - } - }, - "slots": { - "type": "object", - "additionalProperties": { - "type": "object", - "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:SlotSharingGroupRescaleInfo", - "properties": { - "acquiredResourceProfile": { - "type": "object", - "$ref": "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo" - }, - "desiredSlots": { - "type": "integer" - }, - "minimalRequiredSlots": { - "type": "integer" - }, - "postRescaleSlots": { - "type": "integer" - }, - "preRescaleSlots": { - "type": "integer" - }, - "requestResourceProfile": { - "type": "object", - "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo", - "properties": { - "cpuCores": { - "type": "number" - }, - "extendedResources": { - "type": "object", - "additionalProperties": { - "type": "number" - } - }, - "managedMemory": { - "type": "integer" - }, - "networkMemory": { - "type": "integer" - }, - "taskHeapMemory": { - "type": "integer" - }, - "taskOffHeapMemory": { - "type": "integer" - } - } - }, - "slotSharingGroupId": { - "type": "any" - }, - "slotSharingGroupName": { - "type": "string" - } - } - } - }, - "startTimestampInMillis": { - "type": "integer" - }, - "terminalState": { - "type": "string", - "enum": ["COMPLETED", "FAILED", "IGNORED"] - }, - "terminatedReason": { - "type": "string", - "enum": [ - "SUCCEEDED", - "EXCEPTION_OCCURRED", - "RESOURCE_REQUIREMENTS_UPDATED", - "NO_RESOURCES_OR_PARALLELISMS_CHANGE", - "JOB_FINISHED", - "JOB_FAILED", - "JOB_CANCELED", - "JOB_FAILOVER_RESTARTING" - ] - }, - "triggerCause": { - "type": "string", - "enum": [ - "INITIAL_SCHEDULE", - "UPDATE_REQUIREMENT", - "NEW_RESOURCE_AVAILABLE", - "RECOVERABLE_FAILOVER" - ] - }, - "vertices": { - "type": "object", - "additionalProperties": { - "type": "object", - "id": "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescaleDetails:VertexParallelismRescaleInfo", - "properties": { - "desiredParallelism": { - "type": "integer" - }, - "jobVertexId": { - "type": "any" - }, - "jobVertexName": { - "type": "string" - }, - "postRescaleParallelism": { - "type": "integer" - }, - "preRescaleParallelism": { - "type": "integer" - }, - "slotSharingGroupId": { - "type": "any" - }, - "slotSharingGroupName": { - "type": "string" - }, - "sufficientParallelism": { - "type": "integer" - } - } - } - } - } - } } ] -} +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java new file mode 100644 index 00000000000..6e7541f87e3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandler.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.rescales; + +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistory; +import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.Executor; + +/** Handler to response job rescales history. */ +public class JobRescalesHistoryHandler + extends AbstractExecutionGraphHandler<JobRescalesHistory, JobMessageParameters> + implements JsonArchivist { + + public JobRescalesHistoryHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Duration timeout, + Map<String, String> responseHeaders, + MessageHeaders<EmptyRequestBody, JobRescalesHistory, JobMessageParameters> + messageHeaders, + ExecutionGraphCache executionGraphCache, + Executor executor) { + super( + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + executionGraphCache, + executor); + } + + @Override + protected JobRescalesHistory handleRequest( + HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo executionGraphInfo) + throws RestHandlerException { + return getJobRescalesHistory(executionGraphInfo); + } + + private JobRescalesHistory getJobRescalesHistory(ExecutionGraphInfo executionGraphInfo) + throws RestHandlerException { + if (executionGraphInfo.getRescalesStatsSnapshot() == null + || executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory() == null) { + throw new RestHandlerException( + String.format( + "The job `%s` has not enabled the `%s` scheduler, or it has been enabled but the value of configuration option `%s` has not been set to greater than `0`.", + executionGraphInfo.getJobId(), + JobManagerOptions.SchedulerType.Adaptive, + WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE.key()), + HttpResponseStatus.NOT_FOUND, + RestHandlerException.LoggingBehavior.IGNORE); + } + return JobRescalesHistory.fromRescalesStatsSnapshot( + executionGraphInfo.getRescalesStatsSnapshot()); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) + throws IOException { + + ResponseBody response; + try { + response = getJobRescalesHistory(executionGraphInfo); + } catch (RestHandlerException rhe) { + response = new ErrorResponseBody(rhe.getMessage()); + } + return Collections.singletonList( + new ArchivedJson( + JobRescalesHistoryHeaders.getInstance() + .getTargetRestEndpointURL() + .replace( + ':' + JobIDPathParameter.KEY, + executionGraphInfo.getJobId().toString()), + response)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java index 82cdd35fac3..674bc275e77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescaleDetails.java @@ -51,6 +51,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -182,16 +183,18 @@ public class JobRescaleDetails implements ResponseBody, Serializable { terminatedReason); } - public static JobRescaleDetails fromRescale(Rescale rescale, boolean includeSchedulerStates) { + public static JobRescaleDetails fromRescale(Rescale rescale, boolean includeDetailedInfo) { return new JobRescaleDetails( rescale.getRescaleIdInfo().getRescaleUuid().toString(), rescale.getRescaleIdInfo().getResourceRequirementsId().toString(), rescale.getRescaleIdInfo().getRescaleAttemptId(), - rescale.getVertices(), - convertMapValues( - rescale.getSlots(), - SlotSharingGroupRescaleInfo::fromSlotSharingGroupRescale), - includeSchedulerStates ? rescale.getSchedulerStates() : null, + includeDetailedInfo ? rescale.getVertices() : Collections.emptyMap(), + includeDetailedInfo + ? convertMapValues( + rescale.getSlots(), + SlotSharingGroupRescaleInfo::fromSlotSharingGroupRescale) + : Collections.emptyMap(), + includeDetailedInfo ? rescale.getSchedulerStates() : Collections.emptyList(), rescale.getStartTimestamp(), rescale.getEndTimestamp(), rescale.getTerminalState(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistory.java new file mode 100644 index 00000000000..9cfc69d9388 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.rescales; + +import org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale; +import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; + +import io.swagger.v3.oas.annotations.media.Schema; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** Response body for {@link JobRescalesHistoryHandler}. */ +@Schema(name = "JobRescalesHistory") +public class JobRescalesHistory extends ArrayList<JobRescaleDetails> + implements ResponseBody, Serializable { + + private static final long serialVersionUID = 1L; + + // a default constructor is required for collection type marshalling + public JobRescalesHistory() {} + + public JobRescalesHistory(int initialElements) { + super(initialElements); + } + + public static JobRescalesHistory from(List<JobRescaleDetails> rescalesDetails) { + JobRescalesHistory history = new JobRescalesHistory(rescalesDetails.size()); + history.addAll(rescalesDetails); + return history; + } + + public static JobRescalesHistory fromRescalesStatsSnapshot(RescalesStatsSnapshot snapshot) { + return from( + snapshot.getRescaleHistory().stream() + .map((Rescale rescale) -> JobRescaleDetails.fromRescale(rescale, false)) + .collect(Collectors.toList())); + } + + @Override + @JsonIgnore + public boolean isEmpty() { + return super.isEmpty(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistoryHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistoryHeaders.java new file mode 100644 index 00000000000..810864c8ab1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/rescales/JobRescalesHistoryHeaders.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.rescales; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for {@link JobRescalesHistoryHandler}. */ +public class JobRescalesHistoryHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, JobRescalesHistory, JobMessageParameters> { + public static final JobRescalesHistoryHeaders INSTANCE = new JobRescalesHistoryHeaders(); + public static final String JOB_RESCALES_DETAILS_HISTORY_PATH = "/jobs/:jobid/rescales/history"; + + @Override + public Class<JobRescalesHistory> getResponseClass() { + return JobRescalesHistory.class; + } + + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + public String getDescription() { + return "Return job rescales history."; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return JOB_RESCALES_DETAILS_HISTORY_PATH; + } + + public static JobRescalesHistoryHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java index 59d91ca5f4d..42e98a499c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java @@ -127,7 +127,10 @@ public class DefaultRescaleTimeline implements RescaleTimeline { public RescalesStatsSnapshot createSnapshot() { List<Rescale> rescales = rescaleHistory.toArrayList(); Collections.reverse(rescales); - return new RescalesStatsSnapshot(List.copyOf(rescales), rescalesSummary.createSnapshot()); + return new RescalesStatsSnapshot( + List.copyOf(rescales), + Map.copyOf(latestRescales), + rescalesSummary.createSnapshot()); } private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java index f8697fe473f..0dfd3fed380 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -35,17 +36,21 @@ public class RescalesStatsSnapshot implements Serializable { private final List<Rescale> rescaleHistory; private final Map<AbstractID, Rescale> idToRescaleMap; + private final Map<TerminalState, Rescale> latestRescales; private final RescalesSummarySnapshot rescalesSummarySnapshot; public RescalesStatsSnapshot( - List<Rescale> rescaleHistory, RescalesSummarySnapshot rescalesSummarySnapshot) { - this.rescaleHistory = rescaleHistory; + List<Rescale> rescaleHistory, + Map<TerminalState, Rescale> latestRescales, + RescalesSummarySnapshot rescalesSummarySnapshot) { + this.rescaleHistory = List.copyOf(rescaleHistory); this.idToRescaleMap = rescaleHistory.stream() .collect( Collectors.toMap( r -> r.getRescaleIdInfo().getRescaleUuid(), Function.identity())); + this.latestRescales = Map.copyOf(latestRescales); this.rescalesSummarySnapshot = rescalesSummarySnapshot; } @@ -57,6 +62,11 @@ public class RescalesStatsSnapshot implements Serializable { return idToRescaleMap.get(rescaleId); } + @Nullable + public Rescale getLatestRescale(TerminalState terminalState) { + return latestRescales.get(terminalState); + } + public RescalesSummarySnapshot getRescalesSummarySnapshot() { return rescalesSummarySnapshot; } @@ -64,6 +74,7 @@ public class RescalesStatsSnapshot implements Serializable { public static RescalesStatsSnapshot emptySnapshot() { return new RescalesStatsSnapshot( new ArrayList<>(), + new HashMap<>(), new RescalesSummarySnapshot( StatsSummarySnapshot.empty(), StatsSummarySnapshot.empty(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 8bf45671feb..5324e3d5446 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -98,6 +98,7 @@ import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler; import org.apache.flink.runtime.rest.handler.job.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleConfigHandler; import org.apache.flink.runtime.rest.handler.job.rescales.JobRescaleDetailsHandler; +import org.apache.flink.runtime.rest.handler.job.rescales.JobRescalesHistoryHandler; import org.apache.flink.runtime.rest.handler.job.rescaling.RescalingHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointDisposalHandlers; import org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers; @@ -166,6 +167,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails import org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationHeaders; import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigHeaders; import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerCustomLogHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders; @@ -1225,6 +1227,18 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp handlers.add( Tuple2.of(jobRescaleDetailsHandler.getMessageHeaders(), jobRescaleDetailsHandler)); + final JobRescalesHistoryHandler jobRescalesHistoryHandler = + new JobRescalesHistoryHandler( + leaderRetriever, + timeout, + responseHeaders, + JobRescalesHistoryHeaders.getInstance(), + executionGraphCache, + executor); + handlers.add( + Tuple2.of( + jobRescalesHistoryHandler.getMessageHeaders(), jobRescalesHistoryHandler)); + handlers.stream() .map(tuple -> tuple.f1) .filter(handler -> handler instanceof JsonArchivist) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java index 3ecd4e46038..be0275a42b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupResc import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason; import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.util.AbstractID; import org.apache.flink.util.concurrent.Executors; import org.junit.jupiter.api.Test; @@ -150,7 +149,9 @@ class JobRescaleDetailsHandlerTest { null, null, new RescalesStatsSnapshot( - List.of(rescale), rescalesSummary.createSnapshot())); + List.of(rescale), + Map.of(rescale.getTerminalState(), rescale), + rescalesSummary.createSnapshot())); final HandlerRequest<EmptyRequestBody> request = createRequest( executionGraphInfo.getJobId(), rescale.getRescaleIdInfo().getRescaleUuid()); @@ -160,7 +161,7 @@ class JobRescaleDetailsHandlerTest { } private static HandlerRequest<EmptyRequestBody> createRequest( - JobID jobId, AbstractID rescaleUuid) throws HandlerRequestException { + JobID jobId, RescaleIdInfo.RescaleUUID rescaleUuid) throws HandlerRequestException { final Map<String, String> pathParameters = new HashMap<>(); pathParameters.put(JobIDPathParameter.KEY, jobId.toString()); pathParameters.put(JobRescaleIDPathParameter.KEY, rescaleUuid.toString()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandlerTest.java similarity index 73% copy from flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java copy to flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandlerTest.java index 3ecd4e46038..5cff2477c79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescalesHistoryHandlerTest.java @@ -30,28 +30,27 @@ import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; -import org.apache.flink.runtime.rest.messages.job.rescales.JobIDRescaleIDParameters; -import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetails.VertexParallelismRescaleInfo; -import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDetailsHeaders; -import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleIDPathParameter; +import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistory; +import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesHistoryHeaders; import org.apache.flink.runtime.rest.messages.job.rescales.SchedulerStateSpan; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; import org.apache.flink.runtime.scheduler.adaptive.timeline.Rescale; import org.apache.flink.runtime.scheduler.adaptive.timeline.RescaleIdInfo; import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot; import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummary; +import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesSummarySnapshot; import org.apache.flink.runtime.scheduler.adaptive.timeline.SlotSharingGroupRescale; import org.apache.flink.runtime.scheduler.adaptive.timeline.TerminatedReason; import org.apache.flink.runtime.scheduler.adaptive.timeline.TriggerCause; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.util.AbstractID; import org.apache.flink.util.concurrent.Executors; import org.junit.jupiter.api.Test; +import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -59,52 +58,34 @@ import static org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleDeta import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** Test for {@link JobRescaleDetailsHandler}. */ -class JobRescaleDetailsHandlerTest { - - private final JobRescaleDetailsHandler testInstance = - new JobRescaleDetailsHandler( +/** Test for {@link JobRescalesHistoryHandler}. */ +class JobRescalesHistoryHandlerTest { + private final JobRescalesHistoryHandler testInstance = + new JobRescalesHistoryHandler( CompletableFuture::new, TestingUtils.TIMEOUT, - Map.of(), - JobRescaleDetailsHeaders.getInstance(), + Collections.emptyMap(), + JobRescalesHistoryHeaders.getInstance(), new DefaultExecutionGraphCache(TestingUtils.TIMEOUT, TestingUtils.TIMEOUT), Executors.directExecutor()); @Test - void testUnNormalCases() throws HandlerRequestException, RestHandlerException { + void testSchedulerNotEnabledRescalesHistory() throws HandlerRequestException { // Test for adaptive scheduler rescales was not enabled for job. final ExecutionGraphInfo executionGraphInfoWithNullRescalesStatsSnapshot = new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder().build(), List.of(), null); + new ArchivedExecutionGraphBuilder().build(), Collections.emptyList(), null); final HandlerRequest<EmptyRequestBody> request = - createRequest( - executionGraphInfoWithNullRescalesStatsSnapshot.getJobId(), - new RescaleIdInfo.RescaleUUID()); + createRequest(executionGraphInfoWithNullRescalesStatsSnapshot.getJobId()); assertThatThrownBy( () -> testInstance.handleRequest( request, executionGraphInfoWithNullRescalesStatsSnapshot)) .isInstanceOf(RestHandlerException.class); - - // Test for that case could not find rescale details for the specified rescale uuid. - final ExecutionGraphInfo executionGraphInfoWithEmptyRescalesStatsSnapshot = - new ExecutionGraphInfo( - new ArchivedExecutionGraphBuilder().build(), - List.of(), - JobManagerOptions.SchedulerType.Adaptive, - null, - RescalesStatsSnapshot.emptySnapshot()); - assertThatThrownBy( - () -> - testInstance.handleRequest( - request, executionGraphInfoWithEmptyRescalesStatsSnapshot)) - .isInstanceOf(RestHandlerException.class); } @Test - void testRequestNormalJobRescaleStatisticsDetails() - throws HandlerRequestException, RestHandlerException { + void testRequestNormalJobRescaleHistory() throws HandlerRequestException, RestHandlerException { Rescale rescale = new Rescale(new RescaleIdInfo(new RescaleIdInfo.ResourceRequirementsID(), 1L)) .setStartTimestamp(1L) @@ -142,34 +123,40 @@ class JobRescaleDetailsHandlerTest { RescalesSummary rescalesSummary = new RescalesSummary(2); rescalesSummary.addTerminated(rescale); + RescalesSummarySnapshot rescalesSummarySnapshot = rescalesSummary.createSnapshot(); + RescalesStatsSnapshot rescalesStatsSnapshot = + new RescalesStatsSnapshot( + Collections.singletonList(rescale), + Collections.singletonMap(rescale.getTerminalState(), rescale), + rescalesSummarySnapshot); final ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo( new ArchivedExecutionGraphBuilder().build(), - List.of(), - null, + Collections.emptyList(), + JobManagerOptions.SchedulerType.Adaptive, null, - new RescalesStatsSnapshot( - List.of(rescale), rescalesSummary.createSnapshot())); + rescalesStatsSnapshot); final HandlerRequest<EmptyRequestBody> request = - createRequest( - executionGraphInfo.getJobId(), rescale.getRescaleIdInfo().getRescaleUuid()); - JobRescaleDetails jobRescaleDetails = + createRequest(executionGraphInfo.getJobId()); + JobRescalesHistory actualJobRescalesHistory = testInstance.handleRequest(request, executionGraphInfo); - assertThat(jobRescaleDetails).isEqualTo(fromRescale(rescale, true)); + + JobRescalesHistory expectedJobRescalesHistory = + JobRescalesHistory.from(Collections.singletonList(fromRescale(rescale, false))); + assertThat(actualJobRescalesHistory).isEqualTo(expectedJobRescalesHistory); } - private static HandlerRequest<EmptyRequestBody> createRequest( - JobID jobId, AbstractID rescaleUuid) throws HandlerRequestException { + private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId) + throws HandlerRequestException { final Map<String, String> pathParameters = new HashMap<>(); pathParameters.put(JobIDPathParameter.KEY, jobId.toString()); - pathParameters.put(JobRescaleIDPathParameter.KEY, rescaleUuid.toString()); return HandlerRequest.resolveParametersAndCreate( EmptyRequestBody.getInstance(), - new JobIDRescaleIDParameters(), + new JobMessageParameters(), pathParameters, new HashMap<>(), - List.of()); + Collections.emptyList()); } }
