This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ab2536b968ad26aa0b672217e68a0450a47f5fd Author: Yangze Guo <karma...@gmail.com> AuthorDate: Mon Jul 4 13:44:10 2022 +0800 [FLINK-28311][rest] Introduce JobManagerJobConfigurationHandler --- .../shortcodes/generated/rest_v1_dispatcher.html | 76 +++++++++++++++++++ docs/static/generated/rest_v1_dispatcher.yml | 23 ++++++ .../src/test/resources/rest_api_v1.snapshot | 31 ++++++++ .../job/JobManagerJobConfigurationHandler.java | 86 ++++++++++++++++++++++ .../job/JobManagerJobConfigurationHeaders.java | 80 ++++++++++++++++++++ .../runtime/webmonitor/WebMonitorEndpoint.java | 14 ++++ .../job/JobManagerJobConfigurationHandlerTest.java | 68 +++++++++++++++++ 7 files changed, 378 insertions(+) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 0846f9d1c1d..64ebceeae89 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -3021,6 +3021,82 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa </tr> </tbody> </table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/jobmanager/config</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Returns the jobmanager's configuration of a specific job.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <div class="book-expand"> + <label> + <div class="book-expand-head flex justify-between"> + <span>Request</span> + <span>▾</span> + </div> + <input type="checkbox" class="hidden"> + <div class="book-expand-content markdown-inner"> + <pre> + <code> +{} </code> + </pre> + </div> + </label> + </div> + </td> + </tr> + <tr> + <td colspan="2"> + <div class="book-expand"> + <label> + <div class="book-expand-head flex justify-between"> + <span>Response</span> + <span>▾</span> + </div> + <input type="checkbox" class="hidden"> + <div class="book-expand-content markdown-inner"> + <pre> + <code> +{ + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry", + "properties" : { + "key" : { + "type" : "string" + }, + "value" : { + "type" : "string" + } + } + } +} </code> + </pre> + </div> + </label> + </div> + </td> + </tr> + </tbody> +</table> <table class="rest-api table table-bordered"> <tbody> <tr> diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 3f9d37dace0..ecb6b964f33 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -636,6 +636,29 @@ paths: application/json: schema: $ref: '#/components/schemas/JobExecutionResultResponseBody' + /jobs/{jobid}/jobmanager/config: + get: + description: Returns the jobmanager's configuration of a specific job. + operationId: getJobManagerJobConfiguration + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + type: array + properties: + empty: + type: boolean + items: + $ref: '#/components/schemas/ConfigurationInfoEntry' /jobs/{jobid}/metrics: get: description: Provides access to job metrics. diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 37a2482703c..87aedec6d18 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1887,6 +1887,37 @@ } } } + }, { + "url" : "/jobs/:jobid/jobmanager/config", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry", + "properties" : { + "key" : { + "type" : "string" + }, + "value" : { + "type" : "string" + } + } + } + } }, { "url" : "/jobs/:jobid/metrics", "method" : "GET", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java new file mode 100644 index 00000000000..2e0c2825054 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandler.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.ConfigurationInfo; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler which serves the jobmanager's configuration of a specific job. */ +public class JobManagerJobConfigurationHandler + extends AbstractRestHandler< + RestfulGateway, EmptyRequestBody, ConfigurationInfo, JobMessageParameters> + implements JsonArchivist { + private final ConfigurationInfo jobConfig; + + public JobManagerJobConfigurationHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters> + messageHeaders, + Configuration configuration) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders); + + Preconditions.checkNotNull(configuration); + this.jobConfig = ConfigurationInfo.from(configuration); + } + + @Override + protected CompletableFuture<ConfigurationInfo> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) + throws RestHandlerException { + return CompletableFuture.completedFuture(jobConfig); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo) + throws IOException { + return Collections.singletonList( + new ArchivedJson( + JobManagerJobConfigurationHeaders.getInstance() + .getTargetRestEndpointURL() + .replace( + ':' + JobIDPathParameter.KEY, + executionGraphInfo.getJobId().toString()), + jobConfig)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java new file mode 100644 index 00000000000..ec8297e2b8b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobManagerJobConfigurationHeaders.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.ConfigurationInfo; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * Message headers for the {@link + * org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler}. + */ +public class JobManagerJobConfigurationHeaders + implements MessageHeaders<EmptyRequestBody, ConfigurationInfo, JobMessageParameters> { + private static final JobManagerJobConfigurationHeaders INSTANCE = + new JobManagerJobConfigurationHeaders(); + + public static final String JOBMANAGER_JOB_CONFIG_REST_PATH = "/jobs/:jobid/jobmanager/config"; + + private JobManagerJobConfigurationHeaders() {} + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return JOBMANAGER_JOB_CONFIG_REST_PATH; + } + + @Override + public Class<ConfigurationInfo> getResponseClass() { + return ConfigurationInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + public static JobManagerJobConfigurationHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns the jobmanager's configuration of a specific job."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index a77c3d5217a..b3fead6af48 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobDetailsHandler; import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler; import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; +import org.apache.flink.runtime.rest.handler.job.JobManagerJobConfigurationHandler; import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; import org.apache.flink.runtime.rest.handler.job.JobStatusHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; @@ -123,6 +124,7 @@ import org.apache.flink.runtime.rest.messages.cluster.JobManagerStdoutFileHeader import org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeaders; import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders; @@ -335,6 +337,14 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp executionGraphCache, executor); + JobManagerJobConfigurationHandler jobManagerJobConfigurationHandler = + new JobManagerJobConfigurationHandler( + leaderRetriever, + timeout, + responseHeaders, + JobManagerJobConfigurationHeaders.getInstance(), + clusterConfiguration); + CheckpointConfigHandler checkpointConfigHandler = new CheckpointConfigHandler( leaderRetriever, @@ -762,6 +772,10 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp Tuple2.of( jobVertexBackPressureHandler.getMessageHeaders(), jobVertexBackPressureHandler)); + handlers.add( + Tuple2.of( + jobManagerJobConfigurationHandler.getMessageHeaders(), + jobManagerJobConfigurationHandler)); final AbstractRestHandler<?, ?, ?, ?> jobVertexFlameGraphHandler; if (clusterConfiguration.get(RestOptions.ENABLE_FLAMEGRAPH)) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java new file mode 100644 index 00000000000..855bf4724a6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobManagerJobConfigurationHandlerTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.ConfigurationInfo; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Test for the {@link JobManagerJobConfigurationHandler}. */ +public class JobManagerJobConfigurationHandlerTest extends TestLogger { + + @Test + public void testRequestConfiguration() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.ADDRESS, "address"); + + final JobManagerJobConfigurationHandler handler = + new JobManagerJobConfigurationHandler( + () -> null, + TestingUtils.TIMEOUT, + Collections.emptyMap(), + JobManagerJobConfigurationHeaders.getInstance(), + configuration); + + final ConfigurationInfo configurationInfo = + handler.handleRequest( + HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new JobMessageParameters(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyList()), + new TestingRestfulGateway.Builder().build()) + .get(); + + assertEquals(JobManagerOptions.ADDRESS.key(), configurationInfo.get(0).getKey()); + assertEquals("address", configurationInfo.get(0).getValue()); + } +}