Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5579#discussion_r170862163
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java 
---
    @@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID) 
throws JobExecutionExceptio
                        printStatusDuringExecution);
        }
     
    +   /**
    +    * Requests the {@link JobStatus} of the job with the given {@link 
JobID}.
    +    */
    +   public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
    +           final ActorGateway jobManager;
    +           try {
    +                   jobManager = getJobManagerGateway();
    +           } catch (FlinkException e) {
    +                   throw new RuntimeException("Could not retrieve 
JobManage gateway.", e);
    +           }
    +
    +           Future<Object> response = 
jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout);
    +
    +           CompletableFuture<Object> javaFuture = 
FutureUtils.toJava(response);
    +
    +           return javaFuture.thenApply((responseMessage) -> {
    +                   if (responseMessage instanceof 
JobManagerMessages.CurrentJobStatus) {
    +                           return ((JobManagerMessages.CurrentJobStatus) 
responseMessage).status();
    +                   } else if (responseMessage instanceof 
JobManagerMessages.JobNotFound) {
    +                           throw new CompletionException(
    +                                   new IllegalStateException("Could not 
find job with JobId " + jobId));
    +                   } else {
    +                           throw new CompletionException(
    +                                   new IllegalStateException("Unknown 
JobManager response of type " + responseMessage.getClass()));
    +                   }
    +           });
    +   }
    +
    +   /**
    +    * Requests the {@link JobResult} for the given {@link JobID}. The 
method retries multiple
    +    * times to poll the {@link JobResult} before giving up.
    +    *
    +    * @param jobId specifying the job for which to retrieve the {@link 
JobResult}
    +    * @return Future which is completed with the {@link JobResult} once 
the job has completed or
    +    * with a failure if the {@link JobResult} could not be retrieved.
    +    */
    +   public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
    +
    +           CompletableFuture<JobResult> result = new CompletableFuture<>();
    +
    +           try {
    +                   JobExecutionResult jobExecutionResult = 
retrieveJob(jobId);
    +                   Map<String, Object> allAccumulatorResults = 
jobExecutionResult.getAllAccumulatorResults();
    +                   Map<String, SerializedValue<Object>> 
allAccumulatorResultsSerialized = new HashMap<>();
    +
    +                   for (Map.Entry<String, Object> acc : 
allAccumulatorResults.entrySet()) {
    +                           SerializedValue<Object> objectSerializedValue = 
null;
    +                           try {
    +                                   objectSerializedValue = new 
SerializedValue<>(acc.getValue());
    +                           } catch (IOException e) {
    +                                   throw new RuntimeException("Could not 
serialize accumulator result.", e);
    +                           }
    +                           
allAccumulatorResultsSerialized.put(acc.getKey(), objectSerializedValue);
    +                   }
    +                   JobResult jobResult = new JobResult.Builder()
    +                           .jobId(jobId)
    +                           .netRuntime(jobExecutionResult.getNetRuntime())
    +                           
.accumulatorResults(allAccumulatorResultsSerialized)
    +                           .build();
    +                   result.complete(jobResult);
    --- End diff --
    
    Completing `result` here and in the catch block is duplicate logic. Better 
to create `JobResult` variable which is assigned after the `try-catch` block 
and then return `CompletableFuture.completed(jobResult)`.


---

Reply via email to