This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ddc97b5772a89ab7a97b4b7d0572c93704216129
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Tue Mar 15 15:21:24 2022 +0800

    [FLINK-26641][rest] Introduce rest api to fetch job status
---
 .../shortcodes/generated/rest_v1_dispatcher.html   | 71 +++++++++++++++++++
 docs/static/generated/rest_v1_dispatcher.yml       | 34 +++++++++
 .../src/test/resources/rest_api_v1.snapshot        | 26 +++++++
 .../runtime/messages/webmonitor/JobStatusInfo.java | 70 +++++++++++++++++++
 .../runtime/rest/handler/job/JobStatusHandler.java | 59 ++++++++++++++++
 .../rest/messages/job/JobStatusInfoHeaders.java    | 75 ++++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java     | 10 +++
 .../messages/webmonitor/JobStatusInfoTest.java     | 35 ++++++++++
 .../rest/handler/job/JobStatusHandlerTest.java     | 80 ++++++++++++++++++++++
 9 files changed, 460 insertions(+)

diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html 
b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
index ca5386b..188f515 100644
--- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
+++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
@@ -3507,6 +3507,77 @@ Using 'curl' you can upload a jar via 'curl -X POST -H 
"Expect:" -F "jarfile=@pa
 <table class="rest-api table table-bordered">
   <tbody>
     <tr>
+      <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/status</strong></h5></td>
+    </tr>
+    <tr>
+      <td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
+      <td class="text-left">Response code: <code>200 OK</code></td>
+    </tr>
+    <tr>
+      <td colspan="2">Returns the current status of a job execution.</td>
+    </tr>
+    <tr>
+      <td colspan="2">Path parameters</td>
+    </tr>
+    <tr>
+      <td colspan="2">
+        <ul>
+<li><code>jobid</code> - 32-character hexadecimal string value that identifies 
a job.</li>
+        </ul>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Request</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+    <tr>
+      <td colspan="2">
+      <div class="book-expand">
+        <label>
+          <div class="book-expand-head flex justify-between">
+            <span>Response</span>
+            &nbsp;            <span>▾</span>
+          </div>
+          <input type="checkbox" class="hidden">
+          <div class="book-expand-content markdown-inner">
+          <pre>
+            <code>
+{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobStatusInfo",
+  "properties" : {
+    "status" : {
+      "type" : "string",
+      "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", "FAILED", 
"CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", "RECONCILING" ]
+    }
+  }
+}            </code>
+          </pre>
+          </div>
+        </label>
+      </div>
+      </td>
+    </tr>
+  </tbody>
+</table>
+<table class="rest-api table table-bordered">
+  <tbody>
+    <tr>
       <td class="text-left" 
colspan="2"><h5><strong>/jobs/:jobid/stop</strong></h5></td>
     </tr>
     <tr>
diff --git a/docs/static/generated/rest_v1_dispatcher.yml 
b/docs/static/generated/rest_v1_dispatcher.yml
index b3c3928..9b772f3 100644
--- a/docs/static/generated/rest_v1_dispatcher.yml
+++ b/docs/static/generated/rest_v1_dispatcher.yml
@@ -795,6 +795,23 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/AsynchronousOperationResult'
+  /jobs/{jobid}/status:
+    get:
+      description: Returns the current status of a job execution.
+      parameters:
+      - name: jobid
+        in: path
+        description: 32-character hexadecimal string value that identifies a 
job.
+        required: true
+        schema:
+          $ref: '#/components/schemas/JobID'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/JobStatusInfo'
   /jobs/{jobid}/stop:
     post:
       description: "Stops a job with a savepoint. Optionally, it can also emit 
a MAX_WATERMARK\
@@ -1841,6 +1858,23 @@ components:
         num_acknowledged_subtasks:
           type: integer
           format: int32
+    JobStatusInfo:
+      type: object
+      properties:
+        status:
+          type: string
+          enum:
+          - INITIALIZING
+          - CREATED
+          - RUNNING
+          - FAILING
+          - FAILED
+          - CANCELLING
+          - CANCELED
+          - FINISHED
+          - RESTARTING
+          - SUSPENDED
+          - RECONCILING
     JobAccumulator:
       type: object
     JobID:
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 23a4a71..25c5b79 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -2003,6 +2003,32 @@
       }
     }
   }, {
+    "url" : "/jobs/:jobid/status",
+    "method" : "GET",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jobid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "any"
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:messages:webmonitor:JobStatusInfo",
+      "properties" : {
+        "status" : {
+          "type" : "string",
+          "enum" : [ "INITIALIZING", "CREATED", "RUNNING", "FAILING", 
"FAILED", "CANCELLING", "CANCELED", "FINISHED", "RESTARTING", "SUSPENDED", 
"RECONCILING" ]
+        }
+      }
+    }
+  }, {
     "url" : "/jobs/:jobid/stop",
     "method" : "POST",
     "status-code" : "202 Accepted",
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfo.java
new file mode 100644
index 0000000..9d1cf84
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The status of a specific job. */
+public class JobStatusInfo implements ResponseBody, InfoMessage {
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_NAME_STATUS = "status";
+
+    @JsonProperty(FIELD_NAME_STATUS)
+    private final JobStatus jobStatus;
+
+    @JsonCreator
+    public JobStatusInfo(@JsonProperty(FIELD_NAME_STATUS) JobStatus jobStatus) 
{
+        this.jobStatus = checkNotNull(jobStatus);
+    }
+
+    @JsonIgnore
+    public JobStatus getJobStatus() {
+        return jobStatus;
+    }
+
+    @Override
+    public int hashCode() {
+        return jobStatus.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        } else if (obj instanceof JobStatusInfo) {
+            JobStatusInfo that = (JobStatusInfo) obj;
+            return jobStatus.equals(that.jobStatus);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "JobStatusInfo { " + jobStatus + " }";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandler.java
new file mode 100644
index 0000000..4ca7ed7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler for requesting job status. */
+public class JobStatusHandler
+        extends AbstractRestHandler<
+                RestfulGateway, EmptyRequestBody, JobStatusInfo, 
JobMessageParameters> {
+
+    public JobStatusHandler(
+            GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+            Time timeout,
+            Map<String, String> responseHeaders,
+            MessageHeaders<EmptyRequestBody, JobStatusInfo, 
JobMessageParameters> messageHeaders) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+    }
+
+    @Override
+    protected CompletableFuture<JobStatusInfo> handleRequest(
+            @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull 
RestfulGateway gateway)
+            throws RestHandlerException {
+        JobID jobId = request.getPathParameter(JobIDPathParameter.class);
+        return gateway.requestJobStatus(jobId, 
timeout).thenApply(JobStatusInfo::new);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java
new file mode 100644
index 0000000..22b76d7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobStatusInfoHeaders.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link MessageHeaders} for {@link 
org.apache.flink.runtime.rest.handler.job.JobStatusHandler}.
+ */
+public class JobStatusInfoHeaders
+        implements MessageHeaders<EmptyRequestBody, JobStatusInfo, 
JobMessageParameters> {
+    private static final JobStatusInfoHeaders INSTANCE = new 
JobStatusInfoHeaders();
+
+    @Override
+    public Class<EmptyRequestBody> getRequestClass() {
+        return EmptyRequestBody.class;
+    }
+
+    @Override
+    public Class<JobStatusInfo> getResponseClass() {
+        return JobStatusInfo.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public JobMessageParameters getUnresolvedMessageParameters() {
+        return new JobMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.GET;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return "/jobs/:" + JobIDPathParameter.KEY + "/status";
+    }
+
+    public static JobStatusInfoHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Returns the current status of a job execution.";
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 0178b11..3b21be9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import org.apache.flink.runtime.rest.handler.job.JobStatusHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexDetailsHandler;
@@ -120,6 +121,7 @@ import 
org.apache.flink.runtime.rest.messages.cluster.JobManagerStdoutFileHeader
 import 
org.apache.flink.runtime.rest.messages.cluster.JobManagerThreadDumpHeaders;
 import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskCurrentAttemptDetailsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
@@ -293,6 +295,13 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         responseHeaders,
                         JobIdsWithStatusesOverviewHeaders.getInstance());
 
+        JobStatusHandler jobStatusHandler =
+                new JobStatusHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JobStatusInfoHeaders.getInstance());
+
         JobsOverviewHandler jobsOverviewHandler =
                 new JobsOverviewHandler(
                         leaderRetriever,
@@ -653,6 +662,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                         clusterConfigurationHandler));
         handlers.add(Tuple2.of(dashboardConfigHandler.getMessageHeaders(), 
dashboardConfigHandler));
         handlers.add(Tuple2.of(jobIdsHandler.getMessageHeaders(), 
jobIdsHandler));
+        handlers.add(Tuple2.of(jobStatusHandler.getMessageHeaders(), 
jobStatusHandler));
         handlers.add(Tuple2.of(jobsOverviewHandler.getMessageHeaders(), 
jobsOverviewHandler));
         handlers.add(Tuple2.of(jobConfigHandler.getMessageHeaders(), 
jobConfigHandler));
         handlers.add(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfoTest.java
new file mode 100644
index 0000000..93afec0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/JobStatusInfoTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages.webmonitor;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+/** Tests for the {@link JobStatusInfo}. */
+public class JobStatusInfoTest extends 
RestResponseMarshallingTestBase<JobStatusInfo> {
+    @Override
+    protected Class<JobStatusInfo> getTestResponseClass() {
+        return JobStatusInfo.class;
+    }
+
+    @Override
+    protected JobStatusInfo getTestResponseInstance() throws Exception {
+        return new JobStatusInfo(JobStatus.CREATED);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandlerTest.java
new file mode 100644
index 0000000..a110cce
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobStatusHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for the {@link JobStatusHandler}. */
+public class JobStatusHandlerTest extends TestLogger {
+    @Test
+    public void testRequestJobStatus() throws Exception {
+        final JobStatusHandler jobStatusHandler =
+                new JobStatusHandler(
+                        CompletableFuture::new,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        JobStatusInfoHeaders.getInstance());
+
+        final HandlerRequest<EmptyRequestBody> request = createRequest(new 
JobID());
+        final CompletableFuture<JobStatusInfo> response =
+                jobStatusHandler.handleRequest(
+                        request,
+                        new TestingRestfulGateway.Builder()
+                                .setRequestJobStatusFunction(
+                                        ignored ->
+                                                
CompletableFuture.completedFuture(
+                                                        
JobStatus.INITIALIZING))
+                                .build());
+
+        assertEquals(response.get().getJobStatus(), JobStatus.INITIALIZING);
+    }
+
+    private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
+            throws HandlerRequestException {
+        final Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new JobMessageParameters(),
+                pathParameters,
+                Collections.emptyMap(),
+                Collections.emptyList());
+    }
+}

Reply via email to