Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5207#discussion_r160127783 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoad } catch (JobSubmissionException e) { throw new ProgramInvocationException(e); } - // don't return just a JobSubmissionResult here, the signature is lying - // The CliFrontend expects this to be a JobExecutionResult - // TOOD: do not exit this method until job is finished - return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + final JobExecutionResult jobExecutionResult = waitForJobExecutionResult(jobGraph.getJobID()); + + if (jobExecutionResult.getSerializedThrowable().isPresent()) { + final SerializedThrowable serializedThrowable = jobExecutionResult.getSerializedThrowable().get(); + final Throwable throwable = serializedThrowable.deserializeError(classLoader); + throw new ProgramInvocationException(throwable); + } + + try { + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + this.lastJobExecutionResult = new SerializedJobExecutionResult( + jobExecutionResult.getJobId(), + jobExecutionResult.getNetRuntime(), + jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader); --- End diff -- We could also directly use `AccumulatorHelper#deserializeAccumulators`.
---