This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9851ad03c1b9eb529625b8f52c5c936b5b48cab7
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Tue Mar 15 16:50:02 2022 +0800

    [FLINK-26641][client] Request the job status directly from the rest api
    
    Before this commit, the client will fetch the current job status from 
JobDetails,
    which might be stale for a few seconds because of the cache mechanism. As 
the job
    status is lightweight enough, we fetch it directly from a specific job 
status fetcher
    REST API which has no information delay. In this way, we shorten the time 
client waits
    for the Job to finish initializing.
    
    This closes #19095.
---
 .../client/program/rest/RestClusterClient.java     | 10 ++++-
 .../client/program/rest/RestClusterClientTest.java | 49 +++++++---------------
 2 files changed, 24 insertions(+), 35 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 656ffcf..ef8cc9b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 import org.apache.flink.runtime.rest.FileUpload;
@@ -68,6 +69,7 @@ import 
org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
@@ -744,8 +746,12 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
     // 
-------------------------------------------------------------------------
 
     private CompletableFuture<JobStatus> requestJobStatus(JobID jobId) {
-        return getJobDetails(jobId)
-                .thenApply(JobDetailsInfo::getJobStatus)
+        final JobStatusInfoHeaders jobStatusInfoHeaders = 
JobStatusInfoHeaders.getInstance();
+        final JobMessageParameters params = new JobMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+
+        return sendRequest(jobStatusInfoHeaders, params)
+                .thenApply(JobStatusInfo::getJobStatus)
                 .thenApply(
                         jobStatus -> {
                             if (jobStatus == JobStatus.SUSPENDED) {
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 80c67cb..1a86259 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -71,10 +72,9 @@ import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
 import 
org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody;
+import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
@@ -924,11 +924,11 @@ public class RestClusterClientTest extends TestLogger {
      */
     @Test
     public void testNotShowSuspendedJobStatus() throws Exception {
-        final List<JobDetailsInfo> jobDetails = new ArrayList<>();
-        jobDetails.add(buildJobDetail(JobStatus.SUSPENDED));
-        jobDetails.add(buildJobDetail(JobStatus.RUNNING));
+        final List<JobStatusInfo> jobStatusInfo = new ArrayList<>();
+        jobStatusInfo.add(new JobStatusInfo(JobStatus.SUSPENDED));
+        jobStatusInfo.add(new JobStatusInfo(JobStatus.RUNNING));
         final TestJobStatusHandler jobStatusHandler =
-                new TestJobStatusHandler(jobDetails.iterator());
+                new TestJobStatusHandler(jobStatusInfo.iterator());
 
         try (TestRestServerEndpoint restServerEndpoint =
                 createRestServerEndpoint(jobStatusHandler)) {
@@ -943,23 +943,6 @@ public class RestClusterClientTest extends TestLogger {
         }
     }
 
-    private JobDetailsInfo buildJobDetail(JobStatus jobStatus) {
-        return new JobDetailsInfo(
-                jobId,
-                "testJob",
-                true,
-                jobStatus,
-                1L,
-                2L,
-                1L,
-                8888L,
-                1984L,
-                new HashMap<>(),
-                new ArrayList<>(),
-                new HashMap<>(),
-                "{\"id\":\"1234\"}");
-    }
-
     private class TestClientCoordinationHandler
             extends TestHandler<
                     ClientCoordinationRequestBody,
@@ -1095,25 +1078,25 @@ public class RestClusterClientTest extends TestLogger {
     }
 
     private class TestJobStatusHandler
-            extends TestHandler<EmptyRequestBody, JobDetailsInfo, 
JobMessageParameters> {
+            extends TestHandler<EmptyRequestBody, JobStatusInfo, 
JobMessageParameters> {
 
-        private final Iterator<JobDetailsInfo> jobDetailsInfo;
+        private final Iterator<JobStatusInfo> jobStatusInfo;
 
-        private TestJobStatusHandler(@Nonnull Iterator<JobDetailsInfo> 
jobDetailsInfo) {
-            super(JobDetailsHeaders.getInstance());
-            checkState(jobDetailsInfo.hasNext(), "Job details are empty");
-            this.jobDetailsInfo = checkNotNull(jobDetailsInfo);
+        private TestJobStatusHandler(@Nonnull Iterator<JobStatusInfo> 
jobStatusInfo) {
+            super(JobStatusInfoHeaders.getInstance());
+            checkState(jobStatusInfo.hasNext(), "Job status are empty");
+            this.jobStatusInfo = checkNotNull(jobStatusInfo);
         }
 
         @Override
-        protected CompletableFuture<JobDetailsInfo> handleRequest(
+        protected CompletableFuture<JobStatusInfo> handleRequest(
                 @Nonnull HandlerRequest<EmptyRequestBody> request,
                 @Nonnull DispatcherGateway gateway)
                 throws RestHandlerException {
-            if (!jobDetailsInfo.hasNext()) {
-                throw new IllegalStateException("More job details were 
requested than configured");
+            if (!jobStatusInfo.hasNext()) {
+                throw new IllegalStateException("More job status were 
requested than configured");
             }
-            return CompletableFuture.completedFuture(jobDetailsInfo.next());
+            return CompletableFuture.completedFuture(jobStatusInfo.next());
         }
     }
 

Reply via email to