This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a539eb2e84c [FLINK-31469] Allow setting JobResourceRequirements 
through REST API.
a539eb2e84c is described below

commit a539eb2e84cccb21f046902026d4178d864fba7d
Author: David Moravek <d...@apache.org>
AuthorDate: Mon Feb 27 19:08:03 2023 +0100

    [FLINK-31469] Allow setting JobResourceRequirements through REST API.
    
    Signed-off-by: David Moravek <d...@apache.org>
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 126 +++++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       |  54 +++++++++
 .../src/test/resources/rest_api_v1.snapshot        |  50 +++++++-
 .../runtime/dispatcher/DispatcherRestEndpoint.java |  16 +++
 .../job/JobResourceRequirementsHandler.java        |  65 +++++++++++
 .../job/JobResourceRequirementsUpdateHandler.java  |  81 +++++++++++++
 .../messages/job/JobResourceRequirementsBody.java  |  89 +++++++++++++++
 .../job/JobResourceRequirementsHeaders.java        |  73 ++++++++++++
 .../job/JobResourcesRequirementsUpdateHeaders.java |  78 +++++++++++++
 .../job/JobResourceRequirementsBodyTest.java       |  88 ++++++++++++++
 10 files changed, 716 insertions(+), 4 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 922810b9c76..70ef8a1c5bc 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3086,6 +3086,132 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/resource-requirements</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Request details on the job's resource requirements.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody",
+  "additionalProperties" : {
+    "type" : "object",
+    "id" : 
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements",
+    "properties" : {
+      "parallelism" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements:Parallelism",
+        "properties" : {
+          "lowerBound" : {
+            "type" : "integer"
+          },
+          "upperBound" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/resource-requirements</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>PUT</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Request to update job's resource requirements.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Request</summary>
+          <pre><code>{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody",
+  "additionalProperties" : {
+    "type" : "object",
+    "id" : 
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements",
+    "properties" : {
+      "parallelism" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:jobmaster:JobVertexResourceRequirements:Parallelism",
+        "properties" : {
+          "lowerBound" : {
+            "type" : "integer"
+          },
+          "upperBound" : {
+            "type" : "integer"
+          }
+        }
+      }
+    }
+  }
+}</code></pre>
+        </label>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <label>
+          <details>
+          <summary>Response</summary>
+          <pre><code>{}</code></pre>
+        </label>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index c8ece02b72a..71c00a0ddec 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -852,6 +852,42 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/AsynchronousOperationResult'
+  /jobs/{jobid}/resource-requirements:
+    get:
+      description: Request details on the job's resource requirements.
+      operationId: getJobResourceRequirements
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/JobResourceRequirementsBody'
+    put:
+      description: Request to update job's resource requirements.
+      operationId: updateJobResourceRequirements
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/JobResourceRequirementsBody'
+      responses:
+        "200":
+          description: The request was successful.
   /jobs/{jobid}/savepoints:
     post:
       description: "Triggers a savepoint, and optionally cancels the job 
afterwards.\
@@ -2344,6 +2380,10 @@ components:
       properties:
         plan:
           $ref: '#/components/schemas/RawJson'
+    JobResourceRequirementsBody:
+      type: object
+      additionalProperties:
+        $ref: '#/components/schemas/JobVertexResourceRequirements'
     JobResult:
       type: object
       properties:
@@ -2447,6 +2487,11 @@ components:
     JobVertexID:
       pattern: "[0-9a-f]{32}"
       type: string
+    JobVertexResourceRequirements:
+      type: object
+      properties:
+        parallelism:
+          $ref: '#/components/schemas/Parallelism'
     JobVertexTaskManagersInfo:
       type: object
       properties:
@@ -2530,6 +2575,15 @@ components:
         value:
           type: integer
           format: int32
+    Parallelism:
+      type: object
+      properties:
+        lowerBound:
+          type: integer
+          format: int32
+        upperBound:
+          type: integer
+          format: int32
     PendingCheckpointStatistics:
       type: object
       allOf:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index d4b2937e1c6..25d20a764b3 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2300,6 +2300,48 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/resource-requirements",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody"
+    }
+  }, {
+    "url" : "/jobs/:jobid/resource-requirements",
+    "method" : "PUT",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:JobResourceRequirementsBody"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+    }
   }, {
     "url" : "/jobs/:jobid/savepoints",
     "method" : "POST",
@@ -2782,14 +2824,14 @@
     },
     "response" : {
       "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph",
       "properties" : {
         "endTimestamp" : {
           "type" : "integer"
         },
         "data" : {
           "type" : "object",
-          "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph:Node",
+          "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph:Node",
           "properties" : {
             "name" : {
               "type" : "string"
@@ -2801,7 +2843,7 @@
               "type" : "array",
               "items" : {
                 "type" : "object",
-                "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:JobVertexFlameGraph:Node"
+                "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:threadinfo:VertexFlameGraph:Node"
               }
             }
           }
@@ -4063,4 +4105,4 @@
       }
     }
   } ]
-}
+}
\ No newline at end of file
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index e2414378702..fc2d24b0dd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import 
org.apache.flink.runtime.rest.handler.job.JobResourceRequirementsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.JobResourceRequirementsUpdateHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
@@ -96,6 +98,20 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
 
         handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
jobSubmitHandler));
 
+        final JobResourceRequirementsHandler jobResourceRequirementsHandler =
+                new JobResourceRequirementsHandler(leaderRetriever, timeout, 
responseHeaders);
+        handlers.add(
+                Tuple2.of(
+                        jobResourceRequirementsHandler.getMessageHeaders(),
+                        jobResourceRequirementsHandler));
+
+        final JobResourceRequirementsUpdateHandler 
jobResourceRequirementsUpdateHandler =
+                new JobResourceRequirementsUpdateHandler(leaderRetriever, 
timeout, responseHeaders);
+        handlers.add(
+                Tuple2.of(
+                        
jobResourceRequirementsUpdateHandler.getMessageHeaders(),
+                        jobResourceRequirementsUpdateHandler));
+
         return handlers;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java
new file mode 100644
index 00000000000..931e8039763
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import 
org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler for reading current {@link 
org.apache.flink.runtime.jobgraph.JobResourceRequirements
+ * resource requirements} of a given job.
+ */
+public class JobResourceRequirementsHandler
+        extends AbstractRestHandler<
+                DispatcherGateway,
+                EmptyRequestBody,
+                JobResourceRequirementsBody,
+                JobMessageParameters> {
+
+    public JobResourceRequirementsHandler(
+            GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders) {
+        super(leaderRetriever, timeout, responseHeaders, 
JobResourceRequirementsHeaders.INSTANCE);
+    }
+
+    @Override
+    protected CompletableFuture<JobResourceRequirementsBody> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
DispatcherGateway gateway)
+            throws RestHandlerException {
+        final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        return gateway.requestJobResourceRequirements(jobId)
+                .thenApply(JobResourceRequirementsBody::new);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java
new file mode 100644
index 00000000000..d762dd5baa2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobResourceRequirementsUpdateHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
+import 
org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler for updating {@link 
org.apache.flink.runtime.jobgraph.JobResourceRequirements
+ * resource requirements} of a given job.
+ */
+public class JobResourceRequirementsUpdateHandler
+        extends AbstractRestHandler<
+                DispatcherGateway,
+                JobResourceRequirementsBody,
+                EmptyResponseBody,
+                JobMessageParameters> {
+
+    public JobResourceRequirementsUpdateHandler(
+            GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                JobResourcesRequirementsUpdateHeaders.INSTANCE);
+    }
+
+    @Override
+    protected CompletableFuture<EmptyResponseBody> handleRequest(
+            @Nonnull HandlerRequest<JobResourceRequirementsBody> request,
+            @Nonnull DispatcherGateway gateway)
+            throws RestHandlerException {
+        final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        final Optional<JobResourceRequirements> maybeJobResourceRequirements =
+                request.getRequestBody().asJobResourceRequirements();
+        if (maybeJobResourceRequirements.isPresent()) {
+            return gateway.updateJobResourceRequirements(jobId, 
maybeJobResourceRequirements.get())
+                    .thenApply(ignored -> EmptyResponseBody.getInstance());
+        }
+        throw new RestHandlerException(
+                "Request body does not specify resource requirements.",
+                HttpResponseStatus.BAD_REQUEST);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java
new file mode 100644
index 00000000000..37227a1bf77
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBody.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.annotation.docs.FlinkJsonSchema;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeyDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDKeySerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAnyGetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAnySetter;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+/** Body for change job requests. */
+@FlinkJsonSchema.AdditionalFields(type = JobVertexResourceRequirements.class)
+public class JobResourceRequirementsBody implements RequestBody, ResponseBody {
+
+    @JsonAnySetter
+    @JsonAnyGetter
+    @JsonSerialize(keyUsing = JobVertexIDKeySerializer.class)
+    @JsonDeserialize(keyUsing = JobVertexIDKeyDeserializer.class)
+    private final Map<JobVertexID, JobVertexResourceRequirements> 
jobVertexResourceRequirements;
+
+    public JobResourceRequirementsBody() {
+        this(null);
+    }
+
+    public JobResourceRequirementsBody(@Nullable JobResourceRequirements 
jobResourceRequirements) {
+        if (jobResourceRequirements != null) {
+            this.jobVertexResourceRequirements = 
jobResourceRequirements.getJobVertexParallelisms();
+        } else {
+            this.jobVertexResourceRequirements = new HashMap<>();
+        }
+    }
+
+    @JsonIgnore
+    public Optional<JobResourceRequirements> asJobResourceRequirements() {
+        if (jobVertexResourceRequirements.isEmpty()) {
+            return Optional.empty();
+        }
+        return Optional.of(new 
JobResourceRequirements(jobVertexResourceRequirements));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final JobResourceRequirementsBody that = (JobResourceRequirementsBody) 
o;
+        return Objects.equals(jobVertexResourceRequirements, 
that.jobVertexResourceRequirements);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(jobVertexResourceRequirements);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java
new file mode 100644
index 00000000000..5bb17c87cc6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsHeaders.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for REST request to get details on job's resources. */
+public class JobResourceRequirementsHeaders
+        implements RuntimeMessageHeaders<
+                EmptyRequestBody, JobResourceRequirementsBody, 
JobMessageParameters> {
+
+    public static final JobResourceRequirementsHeaders INSTANCE =
+            new JobResourceRequirementsHeaders();
+
+    private static final String URL = "/jobs/:" + JobIDPathParameter.KEY + 
"/resource-requirements";
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    public Class<JobResourceRequirementsBody> getResponseClass() {
+        return JobResourceRequirementsBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Request details on the job's resource requirements.";
+    }
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java
new file mode 100644
index 00000000000..71056dba029
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobResourcesRequirementsUpdateHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** Headers for REST request to patch a job. */
+public class JobResourcesRequirementsUpdateHeaders
+        implements RuntimeMessageHeaders<
+                JobResourceRequirementsBody, EmptyResponseBody, 
JobMessageParameters> {
+
+    public static final JobResourcesRequirementsUpdateHeaders INSTANCE =
+            new JobResourcesRequirementsUpdateHeaders();
+
+    private static final String URL = "/jobs/:" + JobIDPathParameter.KEY + 
"/resource-requirements";
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.PUT;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    public Class<EmptyResponseBody> getResponseClass() {
+        return EmptyResponseBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Request to update job's resource requirements.";
+    }
+
+    @Override
+    public Class<JobResourceRequirementsBody> getRequestClass() {
+        return JobResourceRequirementsBody.class;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    @Override
+    public String operationId() {
+        return "updateJobResourceRequirements";
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java
new file mode 100644
index 00000000000..cfb2ac8d522
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobResourceRequirementsBodyTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+import java.util.Optional;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/** Tests for the marshalling of {@link JobResourceRequirementsBody}. */
+public class JobResourceRequirementsBodyTest
+        extends RestRequestMarshallingTestBase<JobResourceRequirementsBody> {
+    @Override
+    protected Class<JobResourceRequirementsBody> getTestRequestClass() {
+        return JobResourceRequirementsBody.class;
+    }
+
+    @Override
+    protected JobResourceRequirementsBody getTestRequestInstance() {
+        return new JobResourceRequirementsBody(
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(new JobVertexID(), 1, 42)
+                        .setParallelismForJobVertex(new JobVertexID(), 1, 1337)
+                        .build());
+    }
+
+    @Override
+    protected void assertOriginalEqualsToUnmarshalled(
+            JobResourceRequirementsBody expected, JobResourceRequirementsBody 
actual) {
+        assertThat(expected, equalsChangeJobRequestBody(actual));
+    }
+
+    private EqualityChangeJobRequestBodyMatcher equalsChangeJobRequestBody(
+            JobResourceRequirementsBody actual) {
+        return new EqualityChangeJobRequestBodyMatcher(actual);
+    }
+
+    private static final class EqualityChangeJobRequestBodyMatcher
+            extends TypeSafeMatcher<JobResourceRequirementsBody> {
+
+        private final JobResourceRequirementsBody 
actualJobResourceRequirementsBody;
+
+        private EqualityChangeJobRequestBodyMatcher(
+                JobResourceRequirementsBody actualJobResourceRequirementsBody) 
{
+            this.actualJobResourceRequirementsBody = 
actualJobResourceRequirementsBody;
+        }
+
+        @Override
+        protected boolean matchesSafely(JobResourceRequirementsBody 
jobResourceRequirementsBody) {
+            final Optional<JobResourceRequirements> 
maybeActualJobResourceRequirements =
+                    
actualJobResourceRequirementsBody.asJobResourceRequirements();
+            final Optional<JobResourceRequirements> 
maybeJobResourceRequirements =
+                    jobResourceRequirementsBody.asJobResourceRequirements();
+            if (maybeActualJobResourceRequirements.isPresent()
+                    ^ maybeJobResourceRequirements.isPresent()) {
+                return false;
+            }
+            return maybeActualJobResourceRequirements
+                    .map(actual -> 
actual.equals(maybeJobResourceRequirements.get()))
+                    .orElse(true);
+        }
+
+        @Override
+        public void describeTo(Description description) {}
+    }
+}


Reply via email to