This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9851ad03c1b9eb529625b8f52c5c936b5b48cab7 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Tue Mar 15 16:50:02 2022 +0800 [FLINK-26641][client] Request the job status directly from the rest api Before this commit, the client will fetch the current job status from JobDetails, which might be stale for a few seconds because of the cache mechanism. As the job status is lightweight enough, we fetch it directly from a specific job status fetcher REST API which has no information delay. In this way, we shorten the time client waits for the Job to finish initializing. This closes #19095. --- .../client/program/rest/RestClusterClient.java | 10 ++++- .../client/program/rest/RestClusterClientTest.java | 49 +++++++--------------- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 656ffcf..ef8cc9b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import org.apache.flink.runtime.rest.FileUpload; @@ -68,6 +69,7 @@ import org.apache.flink.runtime.rest.messages.cluster.ShutdownHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; +import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; @@ -744,8 +746,12 @@ public class RestClusterClient<T> implements ClusterClient<T> { // ------------------------------------------------------------------------- private CompletableFuture<JobStatus> requestJobStatus(JobID jobId) { - return getJobDetails(jobId) - .thenApply(JobDetailsInfo::getJobStatus) + final JobStatusInfoHeaders jobStatusInfoHeaders = JobStatusInfoHeaders.getInstance(); + final JobMessageParameters params = new JobMessageParameters(); + params.jobPathParameter.resolve(jobId); + + return sendRequest(jobStatusInfoHeaders, params) + .thenApply(JobStatusInfo::getJobStatus) .thenApply( jobStatus -> { if (jobStatus == JobStatus.SUSPENDED) { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 80c67cb..1a86259 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; @@ -71,10 +72,9 @@ import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.runtime.rest.messages.TriggerIdPathParameter; -import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; -import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders; import org.apache.flink.runtime.rest.messages.job.JobExecutionResultResponseBody; +import org.apache.flink.runtime.rest.messages.job.JobStatusInfoHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; @@ -924,11 +924,11 @@ public class RestClusterClientTest extends TestLogger { */ @Test public void testNotShowSuspendedJobStatus() throws Exception { - final List<JobDetailsInfo> jobDetails = new ArrayList<>(); - jobDetails.add(buildJobDetail(JobStatus.SUSPENDED)); - jobDetails.add(buildJobDetail(JobStatus.RUNNING)); + final List<JobStatusInfo> jobStatusInfo = new ArrayList<>(); + jobStatusInfo.add(new JobStatusInfo(JobStatus.SUSPENDED)); + jobStatusInfo.add(new JobStatusInfo(JobStatus.RUNNING)); final TestJobStatusHandler jobStatusHandler = - new TestJobStatusHandler(jobDetails.iterator()); + new TestJobStatusHandler(jobStatusInfo.iterator()); try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(jobStatusHandler)) { @@ -943,23 +943,6 @@ public class RestClusterClientTest extends TestLogger { } } - private JobDetailsInfo buildJobDetail(JobStatus jobStatus) { - return new JobDetailsInfo( - jobId, - "testJob", - true, - jobStatus, - 1L, - 2L, - 1L, - 8888L, - 1984L, - new HashMap<>(), - new ArrayList<>(), - new HashMap<>(), - "{\"id\":\"1234\"}"); - } - private class TestClientCoordinationHandler extends TestHandler< ClientCoordinationRequestBody, @@ -1095,25 +1078,25 @@ public class RestClusterClientTest extends TestLogger { } private class TestJobStatusHandler - extends TestHandler<EmptyRequestBody, JobDetailsInfo, JobMessageParameters> { + extends TestHandler<EmptyRequestBody, JobStatusInfo, JobMessageParameters> { - private final Iterator<JobDetailsInfo> jobDetailsInfo; + private final Iterator<JobStatusInfo> jobStatusInfo; - private TestJobStatusHandler(@Nonnull Iterator<JobDetailsInfo> jobDetailsInfo) { - super(JobDetailsHeaders.getInstance()); - checkState(jobDetailsInfo.hasNext(), "Job details are empty"); - this.jobDetailsInfo = checkNotNull(jobDetailsInfo); + private TestJobStatusHandler(@Nonnull Iterator<JobStatusInfo> jobStatusInfo) { + super(JobStatusInfoHeaders.getInstance()); + checkState(jobStatusInfo.hasNext(), "Job status are empty"); + this.jobStatusInfo = checkNotNull(jobStatusInfo); } @Override - protected CompletableFuture<JobDetailsInfo> handleRequest( + protected CompletableFuture<JobStatusInfo> handleRequest( @Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { - if (!jobDetailsInfo.hasNext()) { - throw new IllegalStateException("More job details were requested than configured"); + if (!jobStatusInfo.hasNext()) { + throw new IllegalStateException("More job status were requested than configured"); } - return CompletableFuture.completedFuture(jobDetailsInfo.next()); + return CompletableFuture.completedFuture(jobStatusInfo.next()); } }