This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3898a4b9bcbae8e50134fa74ccc3c7a9cb4034a5
Author: tison <wander4...@gmail.com>
AuthorDate: Fri Nov 29 18:56:56 2019 +0800

    [FLINK-14762][client] Implement JobClient#getJobStatus
    
    This closes #10311 .
---
 .../flink/client/deployment/ClusterClientJobClientAdapter.java      | 6 ++++++
 .../src/main/java/org/apache/flink/core/execution/JobClient.java    | 6 ++++++
 .../src/test/java/org/apache/flink/api/java/TestingJobClient.java   | 6 ++++++
 .../org/apache/flink/streaming/environment/TestingJobClient.java    | 6 ++++++
 4 files changed, 24 insertions(+)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
index 91a9c91..73a343f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientJobClientAdapter.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.deployment;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.core.execution.JobClient;
@@ -60,6 +61,11 @@ public class ClusterClientJobClientAdapter<ClusterID> 
implements JobClient {
        }
 
        @Override
+       public CompletableFuture<JobStatus> getJobStatus() {
+               return clusterClient.getJobStatus(jobID);
+       }
+
+       @Override
        public CompletableFuture<Void> cancel() {
                return 
clusterClient.cancel(jobID).thenApply(FunctionUtils.nullFn());
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 5bec503..d82cf64 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.core.execution;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 
 import javax.annotation.Nullable;
 
@@ -39,6 +40,11 @@ public interface JobClient extends AutoCloseable {
        JobID getJobID();
 
        /**
+        * Requests the {@link JobStatus} of the associated job.
+        */
+       CompletableFuture<JobStatus> getJobStatus();
+
+       /**
         * Cancels the associated job.
         */
        CompletableFuture<Void> cancel();
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java 
b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
index b5c9873..a24cc3e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/TestingJobClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.execution.JobClient;
 
 import javax.annotation.Nullable;
@@ -39,6 +40,11 @@ public class TestingJobClient implements JobClient {
        }
 
        @Override
+       public CompletableFuture<JobStatus> getJobStatus() {
+               return CompletableFuture.completedFuture(JobStatus.FINISHED);
+       }
+
+       @Override
        public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(ClassLoader userClassloader) {
                return CompletableFuture.completedFuture(new 
JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
index bcc02ee..36439b9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/TestingJobClient.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.environment;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.execution.JobClient;
 
 import javax.annotation.Nullable;
@@ -39,6 +40,11 @@ public class TestingJobClient implements JobClient {
        }
 
        @Override
+       public CompletableFuture<JobStatus> getJobStatus() {
+               return CompletableFuture.completedFuture(JobStatus.FINISHED);
+       }
+
+       @Override
        public CompletableFuture<JobExecutionResult> 
getJobExecutionResult(ClassLoader userClassloader) {
                return CompletableFuture.completedFuture(new 
JobExecutionResult(new JobID(), 0L, Collections.emptyMap()));
        }

Reply via email to