This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ddc97b5772a89ab7a97b4b7d0572c93704216129 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Tue Mar 15 15:21:24 2022 +0800 [FLINK-26641][rest] Introduce rest api to fetch job status --- .../shortcodes/generated/rest_v1_dispatcher.html | 71 +++++++++++++++++++ docs/static/generated/rest_v1_dispatcher.yml | 34 +++++++++ .../src/test/resources/rest_api_v1.snapshot | 26 +++++++ .../runtime/messages/webmonitor/JobStatusInfo.java | 70 +++++++++++++++++++ .../runtime/rest/handler/job/JobStatusHandler.java | 59 ++++++++++++++++ .../rest/messages/job/JobStatusInfoHeaders.java | 75 ++++++++++++++++++++ .../runtime/webmonitor/WebMonitorEndpoint.java | 10 +++ .../messages/webmonitor/JobStatusInfoTest.java | 35 ++++++++++ .../rest/handler/job/JobStatusHandlerTest.java | 80 ++++++++++++++++++++++ 9 files changed, 460 insertions(+) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index ca5386b..188f515 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -3507,6 +3507,77 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa <table class="rest-api table table-bordered"> <tbody> <tr> + <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/status</strong></h5></td> + </tr> + <tr> + <td class="text-left" style="width: 20%">Verb: <code>GET</code></td> + <td class="text-left">Response code: <code>200 OK</code></td> + </tr> + <tr> + <td colspan="2">Returns the current status of a job execution.</td> + </tr> + <tr> + <td colspan="2">Path parameters</td> + </tr> + <tr> + <td colspan="2"> + <ul> +<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li> + </ul> + </td> + </tr> + <tr> + <td colspan="2"> + <div class="book-expand"> + <label> + <div class="book-expand-head flex justify-between"> + <span>Request</span> + <span>▾</span> + </div> + <input type="checkbox" class="hidden"> + <div class="book-expand-content markdown-inner"> + <pre> + <code> +{} </code> + </pre> + </div> + </label> + </div> + </td> + </tr> + <tr> + <td colspan="2"> + <div class="book-expand"> + <label> + <div class="book-expand-head flex justify-between"> + <span>Response</span> + <span>▾</span> + </div> + <input type="checkbox" class="hidden"> + <div class="book-expand-content markdown-inner"> + <pre> + <code> +{ + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobStatusInfo", + "properties" : { + "status" : { + "type" : "string", + "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ] + } + } +} </code> + </pre> + </div> + </label> + </div> + </td> + </tr> + </tbody> +</table> +<table class="rest-api table table-bordered"> + <tbody> + <tr> <td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/stop</strong></h5></td> </tr> <tr> diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index b3c3928..9b772f3 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -795,6 +795,23 @@ paths: application/json: schema: $ref: '#/components/schemas/AsynchronousOperationResult' + /jobs/{jobid}/status: + get: + description: Returns the current status of a job execution. + parameters: + - name: jobid + in: path + description: 32-character hexadecimal string value that identifies a job. + required: true + schema: + $ref: '#/components/schemas/JobID' + responses: + "200": + description: The request was successful. + content: + application/json: + schema: + $ref: '#/components/schemas/JobStatusInfo' /jobs/{jobid}/stop: post: description: "Stops a job with a savepoint. Optionally, it can also emit a MAX_WATERMARK\ @@ -1841,6 +1858,23 @@ components: num_acknowledged_subtasks: type: integer format: int32 + JobStatusInfo: + type: object + properties: + status: + type: string + enum: + - INITIALIZING + - CREATED + - RUNNING + - FAILING + - FAILED + - CANCELLING + - CANCELED + - FINISHED + - RESTARTING + - SUSPENDED + - RECONCILING JobAccumulator: type: object JobID: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 23a4a71..25c5b79 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -2003,6 +2003,32 @@ } } }, { + "url" : "/jobs/:jobid/status", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "jobid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobStatusInfo", + "properties" : { + "status" : { + "type" : "string", + "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ] + } + } + } + }, { "url" : "/jobs/:jobid/stop", "method" : "POST", "status-code" : "202 Accepted", diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfo.java new file mode 100644 index 0000000..9d1cf84 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages.webmonitor; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The status of a specific job. */ +public class JobStatusInfo implements ResponseBody, InfoMessage { + private static final long serialVersionUID = 1L; + + public static final String FIELD_NAME_STATUS = "status"; + + @JsonProperty(FIELD_NAME_STATUS) + private final JobStatus jobStatus; + + @JsonCreator + public JobStatusInfo(@JsonProperty(FIELD_NAME_STATUS) JobStatus jobStatus) { + this.jobStatus = checkNotNull(jobStatus); + } + + @JsonIgnore + public JobStatus getJobStatus() { + return jobStatus; + } + + @Override + public int hashCode() { + return jobStatus.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj instanceof JobStatusInfo) { + JobStatusInfo that = (JobStatusInfo) obj; + return jobStatus.equals(that.jobStatus); + } else { + return false; + } + } + + @Override + public String toString() { + return "JobStatusInfo { " + jobStatus + " }"; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandler.java new file mode 100644 index 0000000..4ca7ed7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandler.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler for requesting job status. */ +public class JobStatusHandler + extends AbstractRestHandler< + RestfulGateway, EmptyRequestBody, JobStatusInfo, JobMessageParameters> { + + public JobStatusHandler( + GatewayRetriever<? extends RestfulGateway> leaderRetriever, + Time timeout, + Map<String, String> responseHeaders, + MessageHeaders<EmptyRequestBody, JobStatusInfo, JobMessageParameters> messageHeaders) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture<JobStatusInfo> handleRequest( + @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) + throws RestHandlerException { + JobID jobId = request.getPathParameter(JobIDPathParameter.class); + return gateway.requestJobStatus(jobId, timeout).thenApply(JobStatusInfo::new); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java new file mode 100644 index 0000000..22b76d7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +/** + * {@link MessageHeaders} for {@link org.apache.flink.runtime.rest.handler.job.JobStatusHandler}. + */ +public class JobStatusInfoHeaders + implements MessageHeaders<EmptyRequestBody, JobStatusInfo, JobMessageParameters> { + private static final JobStatusInfoHeaders INSTANCE = new JobStatusInfoHeaders(); + + @Override + public Class<EmptyRequestBody> getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class<JobStatusInfo> getResponseClass() { + return JobStatusInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public JobMessageParameters getUnresolvedMessageParameters() { + return new JobMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/jobs/:" + JobIDPathParameter.KEY + "/status"; + } + + public static JobStatusInfoHeaders getInstance() { + return INSTANCE; + } + + @Override + public String getDescription() { + return "Returns the current status of a job execution."; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 0178b11..3b21be9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler; import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler; import org.apache.flink.runtime.rest.handler.job.JobIdsHandler; import org.apache.flink.runtime.rest.handler.job.JobPlanHandler; +import org.apache.flink.runtime.rest.handler.job.JobStatusHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler; import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler; @@ -120,6 +121,7 @@ import org.apache.flink.runtime.rest.messages.cluster.JobManagerStdoutFileHeader import org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeaders; import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; +import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders; @@ -293,6 +295,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp responseHeaders, JobIdsWithStatusesOverviewHeaders.getInstance()); + JobStatusHandler jobStatusHandler = + new JobStatusHandler( + leaderRetriever, + timeout, + responseHeaders, + JobStatusInfoHeaders.getInstance()); + JobsOverviewHandler jobsOverviewHandler = new JobsOverviewHandler( leaderRetriever, @@ -653,6 +662,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp clusterConfigurationHandler)); handlers.add(Tuple2.of(dashboardConfigHandler.getMessageHeaders(), dashboardConfigHandler)); handlers.add(Tuple2.of(jobIdsHandler.getMessageHeaders(), jobIdsHandler)); + handlers.add(Tuple2.of(jobStatusHandler.getMessageHeaders(), jobStatusHandler)); handlers.add(Tuple2.of(jobsOverviewHandler.getMessageHeaders(), jobsOverviewHandler)); handlers.add(Tuple2.of(jobConfigHandler.getMessageHeaders(), jobConfigHandler)); handlers.add( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfoTest.java new file mode 100644 index 0000000..93afec0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfoTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages.webmonitor; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; + +/** Tests for the {@link JobStatusInfo}. */ +public class JobStatusInfoTest extends RestResponseMarshallingTestBase<JobStatusInfo> { + @Override + protected Class<JobStatusInfo> getTestResponseClass() { + return JobStatusInfo.class; + } + + @Override + protected JobStatusInfo getTestResponseInstance() throws Exception { + return new JobStatusInfo(JobStatus.CREATED); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandlerTest.java new file mode 100644 index 0000000..a110cce --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandlerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** Test for the {@link JobStatusHandler}. */ +public class JobStatusHandlerTest extends TestLogger { + @Test + public void testRequestJobStatus() throws Exception { + final JobStatusHandler jobStatusHandler = + new JobStatusHandler( + CompletableFuture::new, + TestingUtils.TIMEOUT, + Collections.emptyMap(), + JobStatusInfoHeaders.getInstance()); + + final HandlerRequest<EmptyRequestBody> request = createRequest(new JobID()); + final CompletableFuture<JobStatusInfo> response = + jobStatusHandler.handleRequest( + request, + new TestingRestfulGateway.Builder() + .setRequestJobStatusFunction( + ignored -> + CompletableFuture.completedFuture( + JobStatus.INITIALIZING)) + .build()); + + assertEquals(response.get().getJobStatus(), JobStatus.INITIALIZING); + } + + private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId) + throws HandlerRequestException { + final Map<String, String> pathParameters = new HashMap<>(); + pathParameters.put(JobIDPathParameter.KEY, jobId.toString()); + + return HandlerRequest.resolveParametersAndCreate( + EmptyRequestBody.getInstance(), + new JobMessageParameters(), + pathParameters, + Collections.emptyMap(), + Collections.emptyList()); + } +}