This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ab2536b968ad26aa0b672217e68a0450a47f5fd
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Mon Jul 4 13:44:10 2022 +0800

    [FLINK-28311][rest] Introduce JobManagerJobConfigurationHandler
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 76 +++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       | 23 ++++++
 .../src/test/resources/rest_api_v1.snapshot        | 31 ++++++++
 .../job/JobManagerJobConfigurationHandler.java     | 86 ++++++++++++++++++++++
 .../job/JobManagerJobConfigurationHeaders.java     | 80 ++++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 14 ++++
 .../job/JobManagerJobConfigurationHandlerTest.java | 68 +++++++++++++++++
 7 files changed, 378 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index 0846f9d1c1d..64ebceeae89 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3021,6 +3021,82 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
     </tr>
   </tbody>
 </table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/jobmanager/config</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the jobmanager's configuration of a specific 
job.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Request</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Response</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{
+  "type" : "array",
+  "items" : {
+    "type" : "object",
+    "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+    "properties" : {
+      "key" : {
+        "type" : "string"
+      },
+      "value" : {
+        "type" : "string"
+      }
+    }
+  }
+}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index 3f9d37dace0..ecb6b964f33 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -636,6 +636,29 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/JobExecutionResultResponseBody'
+  /jobs/{jobid}/jobmanager/config:
+    get:
+      description: Returns the jobmanager's configuration of a specific job.
+      operationId: getJobManagerJobConfiguration
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                type: array
+                properties:
+                  empty:
+                    type: boolean
+                items:
+                  $ref: '#/components/schemas/ConfigurationInfoEntry'
   /jobs/{jobid}/metrics:
     get:
       description: Provides access to job metrics.
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 37a2482703c..87aedec6d18 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -1887,6 +1887,37 @@
         }
       }
     }
+  }, {
+    "url" : "/jobs/:jobid/jobmanager/config",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "any"
+    },
+    "response" : {
+      "type" : "array",
+      "items" : {
+        "type" : "object",
+        "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
+        "properties" : {
+          "key" : {
+            "type" : "string"
+          },
+          "value" : {
+            "type" : "string"
+          }
+        }
+      }
+    }
   }, {
     "url" : "/jobs/:jobid/metrics",
     "method" : "GET",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java
new file mode 100644
index 00000000000..2e0c2825054
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler which serves the jobmanager's configuration of a specific job. */
+public class JobManagerJobConfigurationHandler
+        extends AbstractRestHandler<
+                RestfulGateway, EmptyRequestBody, ConfigurationInfo, 
JobMessageParameters>
+        implements JsonArchivist {
+    private final ConfigurationInfo jobConfig;
+
+    public JobManagerJobConfigurationHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, ConfigurationInfo, 
JobMessageParameters>
+                    messageHeaders,
+            Configuration configuration) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+        Preconditions.checkNotNull(configuration);
+        this.jobConfig = ConfigurationInfo.from(configuration);
+    }
+
+    @Override
+    protected CompletableFuture<ConfigurationInfo> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
RestfulGateway gateway)
+            throws RestHandlerException {
+        return CompletableFuture.completedFuture(jobConfig);
+    }
+
+    @Override
+    public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo 
executionGraphInfo)
+            throws IOException {
+        return Collections.singletonList(
+                new ArchivedJson(
+                        JobManagerJobConfigurationHeaders.getInstance()
+                                .getTargetRestEndpointURL()
+                                .replace(
+                                        ':' + JobIDPathParameter.KEY,
+                                        
executionGraphInfo.getJobId().toString()),
+                        jobConfig));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
new file mode 100644
index 00000000000..ec8297e2b8b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link
+ * 
org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler}.
+ */
+public class JobManagerJobConfigurationHeaders
+        implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, 
JobMessageParameters> {
+    private static final JobManagerJobConfigurationHeaders INSTANCE =
+            new JobManagerJobConfigurationHeaders();
+
+    public static final String JOBMANAGER_JOB_CONFIG_REST_PATH = 
"/jobs/:jobid/jobmanager/config";
+
+    private JobManagerJobConfigurationHeaders() {}
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return JOBMANAGER_JOB_CONFIG_REST_PATH;
+    }
+
+    @Override
+    public Class<ConfigurationInfo> getResponseClass() {
+        return ConfigurationInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    public static JobManagerJobConfigurationHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the jobmanager's configuration of a specific job.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index a77c3d5217a..b3fead6af48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
+import 
org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobStatusHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
@@ -123,6 +124,7 @@ import 
org.apache.flink.runtime.rest.messages.cluster.JobManagerStdoutFileHeader
 import 
org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeaders;
 import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
@@ -335,6 +337,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         executionGraphCache,
                         executor);
 
+        JobManagerJobConfigurationHandler jobManagerJobConfigurationHandler =
+                new JobManagerJobConfigurationHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobManagerJobConfigurationHeaders.getInstance(),
+                        clusterConfiguration);
+
         CheckpointConfigHandler checkpointConfigHandler =
                 new CheckpointConfigHandler(
                         leaderRetriever,
@@ -762,6 +772,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                 Tuple2.of(
                         jobVertexBackPressureHandler.getMessageHeaders(),
                         jobVertexBackPressureHandler));
+        handlers.add(
+                Tuple2.of(
+                        jobManagerJobConfigurationHandler.getMessageHeaders(),
+                        jobManagerJobConfigurationHandler));
 
         final AbstractRestHandler<?, ?, ?, ?> jobVertexFlameGraphHandler;
         if (clusterConfiguration.get(RestOptions.ENABLE_FLAMEGRAPH)) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java
new file mode 100644
index 00000000000..855bf4724a6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test for the {@link JobManagerJobConfigurationHandler}. */
+public class JobManagerJobConfigurationHandlerTest extends TestLogger {
+
+    @Test
+    public void testRequestConfiguration() throws Exception {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.ADDRESS, "address");
+
+        final JobManagerJobConfigurationHandler handler =
+                new JobManagerJobConfigurationHandler(
+                        () -> null,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        JobManagerJobConfigurationHeaders.getInstance(),
+                        configuration);
+
+        final ConfigurationInfo configurationInfo =
+                handler.handleRequest(
+                                HandlerRequest.resolveParametersAndCreate(
+                                        EmptyRequestBody.getInstance(),
+                                        new JobMessageParameters(),
+                                        Collections.emptyMap(),
+                                        Collections.emptyMap(),
+                                        Collections.emptyList()),
+                                new TestingRestfulGateway.Builder().build())
+                        .get();
+
+        assertEquals(JobManagerOptions.ADDRESS.key(), 
configurationInfo.get(0).getKey());
+        assertEquals("address", configurationInfo.get(0).getValue());
+    }
+}

Reply via email to