Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5579#discussion_r170864042 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java --- @@ -577,6 +583,81 @@ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionExceptio printStatusDuringExecution); } + /** + * Requests the {@link JobStatus} of the job with the given {@link JobID}. + */ + public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { + final ActorGateway jobManager; + try { + jobManager = getJobManagerGateway(); + } catch (FlinkException e) { + throw new RuntimeException("Could not retrieve JobManage gateway.", e); + } + + Future<Object> response = jobManager.ask(JobManagerMessages.getRequestJobStatus(jobId), timeout); + + CompletableFuture<Object> javaFuture = FutureUtils.toJava(response); + + return javaFuture.thenApply((responseMessage) -> { + if (responseMessage instanceof JobManagerMessages.CurrentJobStatus) { + return ((JobManagerMessages.CurrentJobStatus) responseMessage).status(); + } else if (responseMessage instanceof JobManagerMessages.JobNotFound) { + throw new CompletionException( + new IllegalStateException("Could not find job with JobId " + jobId)); + } else { + throw new CompletionException( + new IllegalStateException("Unknown JobManager response of type " + responseMessage.getClass())); + } + }); + } + + /** + * Requests the {@link JobResult} for the given {@link JobID}. The method retries multiple + * times to poll the {@link JobResult} before giving up. + * + * @param jobId specifying the job for which to retrieve the {@link JobResult} + * @return Future which is completed with the {@link JobResult} once the job has completed or + * with a failure if the {@link JobResult} could not be retrieved. + */ + public CompletableFuture<JobResult> requestJobResult(JobID jobId) { + + CompletableFuture<JobResult> result = new CompletableFuture<>(); + + try { + JobExecutionResult jobExecutionResult = retrieveJob(jobId); + Map<String, Object> allAccumulatorResults = jobExecutionResult.getAllAccumulatorResults(); + Map<String, SerializedValue<Object>> allAccumulatorResultsSerialized = new HashMap<>(); + + for (Map.Entry<String, Object> acc : allAccumulatorResults.entrySet()) { + SerializedValue<Object> objectSerializedValue = null; + try { + objectSerializedValue = new SerializedValue<>(acc.getValue()); + } catch (IOException e) { + throw new RuntimeException("Could not serialize accumulator result.", e); --- End diff -- Let's fail the returned future exceptionally in this case.
---