This is an automated email from the ASF dual-hosted git repository. dmvk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a539eb2e84c [FLINK-31469] Allow setting JobResourceRequirements through REST API. a539eb2e84c is described below commit a539eb2e84cccb21f046902026d4178d864fba7d Author: David Moravek <d...@apache.org> AuthorDate: Mon Feb 27 19:08:03 2023 +0100 [FLINK-31469] Allow setting JobResourceRequirements through REST API. Signed-off-by: David Moravek <d...@apache.org> --- .../shortcodes/generated/rest_v1_dispatcher.html | 126 +++++++++++++++++++++ docs/static/generated/rest_v1_dispatcher.yml | 54 +++++++++ .../src/test/resources/rest_api_v1.snapshot | 50 +++++++- .../runtime/dispatcher/DispatcherRestEndpoint.java | 16 +++ .../job/JobResourceRequirementsHandler.java | 65 +++++++++++ .../job/JobResourceRequirementsUpdateHandler.java | 81 +++++++++++++ .../messages/job/JobResourceRequirementsBody.java | 89 +++++++++++++++ .../job/JobResourceRequirementsHeaders.java | 73 ++++++++++++ .../job/JobResourcesRequirementsUpdateHeaders.java | 78 +++++++++++++ .../job/JobResourceRequirementsBodyTest.java | 88 ++++++++++++++ 10 files changed, 716 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 922810b9c76..70ef8a1c5bc 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -3086,6 +3086,132 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa </tr> </tbody> </table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/resource-requirements</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Request details on the job's resource requirements.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Request</summary> + <pre><code>{}</code></pre> + </label> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Response</summary> + <pre><code>{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements", + "properties" : { + "parallelism" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements:Parallelism", + "properties" : { + "lowerBound" : { + "type" : "integer" + }, + "upperBound" : { + "type" : "integer" + } + } + } + } + } +}</code></pre> + </label> + </td> + </tr> + </tbody> +</table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/resource-requirements</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>PUT</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Request to update job's resource requirements.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Request</summary> + <pre><code>{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody", + "additionalProperties" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements", + "properties" : { + "parallelism" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements:Parallelism", + "properties" : { + "lowerBound" : { + "type" : "integer" + }, + "upperBound" : { + "type" : "integer" + } + } + } + } + } +}</code></pre> + </label> + </td> + </tr> + <tr> + <td colspan="2"> + <label> + <details> + <summary>Response</summary> + <pre><code>{}</code></pre> + </label> + </td> + </tr> + </tbody> +</table> <table class="rest-api table table-bordered"> <tbody> <tr> diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index c8ece02b72a..71c00a0ddec 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -852,6 +852,42 @@ paths: application/json: schema: $ref: '#/components/schemas/AsynchronousOperationResult' + /jobs/{jobid}/resource-requirements: + get: + description: Request details on the job's resource requirements. + operationId: getJobResourceRequirements + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/JobResourceRequirementsBody' + put: + description: Request to update job's resource requirements. + operationId: updateJobResourceRequirements + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/JobResourceRequirementsBody' + responses: + "200": + description: The request was successful. /jobs/{jobid}/savepoints: post: description: "Triggers a savepoint, and optionally cancels the job afterwards.\ @@ -2344,6 +2380,10 @@ components: properties: plan: $ref: '#/components/schemas/RawJson' + JobResourceRequirementsBody: + type: object + additionalProperties: + $ref: '#/components/schemas/JobVertexResourceRequirements' JobResult: type: object properties: @@ -2447,6 +2487,11 @@ components: JobVertexID: pattern: "[0-9a-f]{32}" type: string + JobVertexResourceRequirements: + type: object + properties: + parallelism: + $ref: '#/components/schemas/Parallelism' JobVertexTaskManagersInfo: type: object properties: @@ -2530,6 +2575,15 @@ components: value: type: integer format: int32 + Parallelism: + type: object + properties: + lowerBound: + type: integer + format: int32 + upperBound: + type: integer + format: int32 PendingCheckpointStatistics: type: object allOf: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index d4b2937e1c6..25d20a764b3 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -2300,6 +2300,48 @@ } } } + }, { + "url" : "/jobs/:jobid/resource-requirements", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody" + } + }, { + "url" : "/jobs/:jobid/resource-requirements", + "method" : "PUT", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody" + } }, { "url" : "/jobs/:jobid/savepoints", "method" : "POST", @@ -2782,14 +2824,14 @@ }, "response" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph", + "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph", "properties" : { "endTimestamp" : { "type" : "integer" }, "data" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph:Node", + "id" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph:Node", "properties" : { "name" : { "type" : "string" @@ -2801,7 +2843,7 @@ "type" : "array", "items" : { "type" : "object", - "$ref" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph:Node" + "$ref" : "urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph:Node" } } } @@ -4063,4 +4105,4 @@ } } } ] -} +} \ No newline at end of file diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index e2414378702..fc2d24b0dd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.job.JobResourceRequirementsHandler; +import org.apache.flink.runtime.rest.handler.job.JobResourceRequirementsUpdateHandler; import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; @@ -96,6 +98,20 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); + final JobResourceRequirementsHandler jobResourceRequirementsHandler = + new JobResourceRequirementsHandler(leaderRetriever, timeout, responseHeaders); + handlers.add( + Tuple2.of( + jobResourceRequirementsHandler.getMessageHeaders(), + jobResourceRequirementsHandler)); + + final JobResourceRequirementsUpdateHandler jobResourceRequirementsUpdateHandler = + new JobResourceRequirementsUpdateHandler(leaderRetriever, timeout, responseHeaders); + handlers.add( + Tuple2.of( + jobResourceRequirementsUpdateHandler.getMessageHeaders(), + jobResourceRequirementsUpdateHandler)); + return handlers; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java new file mode 100644 index 00000000000..931e8039763 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; +import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Rest handler for reading current {@link org.apache.flink.runtime.jobgraph.JobResourceRequirements + * resource requirements} of a given job. + */ +public class JobResourceRequirementsHandler + extends AbstractRestHandler< + DispatcherGateway, + EmptyRequestBody, + JobResourceRequirementsBody, + JobMessageParameters> { + + public JobResourceRequirementsHandler( + GatewayRetriever<? extends DispatcherGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders) { + super(leaderRetriever, timeout, responseHeaders, JobResourceRequirementsHeaders.INSTANCE); + } + + @Override + protected CompletableFuture<JobResourceRequirementsBody> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull DispatcherGateway gateway) + throws RestHandlerException { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return gateway.requestJobResourceRequirements(jobId) + .thenApply(JobResourceRequirementsBody::new); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java new file mode 100644 index 00000000000..d762dd5baa2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody; +import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** + * Rest handler for updating {@link org.apache.flink.runtime.jobgraph.JobResourceRequirements + * resource requirements} of a given job. + */ +public class JobResourceRequirementsUpdateHandler + extends AbstractRestHandler< + DispatcherGateway, + JobResourceRequirementsBody, + EmptyResponseBody, + JobMessageParameters> { + + public JobResourceRequirementsUpdateHandler( + GatewayRetriever<? extends DispatcherGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders) { + super( + leaderRetriever, + timeout, + responseHeaders, + JobResourcesRequirementsUpdateHeaders.INSTANCE); + } + + @Override + protected CompletableFuture<EmptyResponseBody> handleRequest( + @Nonnull HandlerRequest<JobResourceRequirementsBody> request, + @Nonnull DispatcherGateway gateway) + throws RestHandlerException { + final JobID jobId = request.getPathParameter(JobIDPathParameter.class); + final Optional<JobResourceRequirements> maybeJobResourceRequirements = + request.getRequestBody().asJobResourceRequirements(); + if (maybeJobResourceRequirements.isPresent()) { + return gateway.updateJobResourceRequirements(jobId, maybeJobResourceRequirements.get()) + .thenApply(ignored -> EmptyResponseBody.getInstance()); + } + throw new RestHandlerException( + "Request body does not specify resource requirements.", + HttpResponseStatus.BAD_REQUEST); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java new file mode 100644 index 00000000000..37227a1bf77 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.annotation.docs.FlinkJsonSchema; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer; +import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAnyGetter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAnySetter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Body for change job requests. */ +@FlinkJsonSchema.AdditionalFields(type = JobVertexResourceRequirements.class) +public class JobResourceRequirementsBody implements RequestBody, ResponseBody { + + @JsonAnySetter + @JsonAnyGetter + @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class) + @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class) + private final Map<JobVertexID, JobVertexResourceRequirements> jobVertexResourceRequirements; + + public JobResourceRequirementsBody() { + this(null); + } + + public JobResourceRequirementsBody(@Nullable JobResourceRequirements jobResourceRequirements) { + if (jobResourceRequirements != null) { + this.jobVertexResourceRequirements = jobResourceRequirements.getJobVertexParallelisms(); + } else { + this.jobVertexResourceRequirements = new HashMap<>(); + } + } + + @JsonIgnore + public Optional<JobResourceRequirements> asJobResourceRequirements() { + if (jobVertexResourceRequirements.isEmpty()) { + return Optional.empty(); + } + return Optional.of(new JobResourceRequirements(jobVertexResourceRequirements)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final JobResourceRequirementsBody that = (JobResourceRequirementsBody) o; + return Objects.equals(jobVertexResourceRequirements, that.jobVertexResourceRequirements); + } + + @Override + public int hashCode() { + return Objects.hash(jobVertexResourceRequirements); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java new file mode 100644 index 00000000000..5bb17c87cc6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for REST request to get details on job's resources. */ +public class JobResourceRequirementsHeaders + implements RuntimeMessageHeaders< + EmptyRequestBody, JobResourceRequirementsBody, JobMessageParameters> { + + public static final JobResourceRequirementsHeaders INSTANCE = + new JobResourceRequirementsHeaders(); + + private static final String URL = "/jobs/:" + JobIDPathParameter.KEY + "/resource-requirements"; + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Class<JobResourceRequirementsBody> getResponseClass() { + return JobResourceRequirementsBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Request details on the job's resource requirements."; + } + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java new file mode 100644 index 00000000000..71056dba029 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** Headers for REST request to patch a job. */ +public class JobResourcesRequirementsUpdateHeaders + implements RuntimeMessageHeaders< + JobResourceRequirementsBody, EmptyResponseBody, JobMessageParameters> { + + public static final JobResourcesRequirementsUpdateHeaders INSTANCE = + new JobResourcesRequirementsUpdateHeaders(); + + private static final String URL = "/jobs/:" + JobIDPathParameter.KEY + "/resource-requirements"; + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.PUT; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Class<EmptyResponseBody> getResponseClass() { + return EmptyResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Request to update job's resource requirements."; + } + + @Override + public Class<JobResourceRequirementsBody> getRequestClass() { + return JobResourceRequirementsBody.class; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public String operationId() { + return "updateJobResourceRequirements"; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java new file mode 100644 index 00000000000..cfb2ac8d522 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeMatcher; + +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; + +/** Tests for the marshalling of {@link JobResourceRequirementsBody}. */ +public class JobResourceRequirementsBodyTest + extends RestRequestMarshallingTestBase<JobResourceRequirementsBody> { + @Override + protected Class<JobResourceRequirementsBody> getTestRequestClass() { + return JobResourceRequirementsBody.class; + } + + @Override + protected JobResourceRequirementsBody getTestRequestInstance() { + return new JobResourceRequirementsBody( + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(new JobVertexID(), 1, 42) + .setParallelismForJobVertex(new JobVertexID(), 1, 1337) + .build()); + } + + @Override + protected void assertOriginalEqualsToUnmarshalled( + JobResourceRequirementsBody expected, JobResourceRequirementsBody actual) { + assertThat(expected, equalsChangeJobRequestBody(actual)); + } + + private EqualityChangeJobRequestBodyMatcher equalsChangeJobRequestBody( + JobResourceRequirementsBody actual) { + return new EqualityChangeJobRequestBodyMatcher(actual); + } + + private static final class EqualityChangeJobRequestBodyMatcher + extends TypeSafeMatcher<JobResourceRequirementsBody> { + + private final JobResourceRequirementsBody actualJobResourceRequirementsBody; + + private EqualityChangeJobRequestBodyMatcher( + JobResourceRequirementsBody actualJobResourceRequirementsBody) { + this.actualJobResourceRequirementsBody = actualJobResourceRequirementsBody; + } + + @Override + protected boolean matchesSafely(JobResourceRequirementsBody jobResourceRequirementsBody) { + final Optional<JobResourceRequirements> maybeActualJobResourceRequirements = + actualJobResourceRequirementsBody.asJobResourceRequirements(); + final Optional<JobResourceRequirements> maybeJobResourceRequirements = + jobResourceRequirementsBody.asJobResourceRequirements(); + if (maybeActualJobResourceRequirements.isPresent() + ^ maybeJobResourceRequirements.isPresent()) { + return false; + } + return maybeActualJobResourceRequirements + .map(actual -> actual.equals(maybeJobResourceRequirements.get())) + .orElse(true); + } + + @Override + public void describeTo(Description description) {} + } +}