Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5207#discussion_r160127783
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
    @@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoad
                } catch (JobSubmissionException e) {
                        throw new ProgramInvocationException(e);
                }
    -           // don't return just a JobSubmissionResult here, the signature 
is lying
    -           // The CliFrontend expects this to be a JobExecutionResult
     
    -           // TOOD: do not exit this method until job is finished
    -           return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
    +           final JobExecutionResult jobExecutionResult = 
waitForJobExecutionResult(jobGraph.getJobID());
    +
    +           if (jobExecutionResult.getSerializedThrowable().isPresent()) {
    +                   final SerializedThrowable serializedThrowable = 
jobExecutionResult.getSerializedThrowable().get();
    +                   final Throwable throwable = 
serializedThrowable.deserializeError(classLoader);
    +                   throw new ProgramInvocationException(throwable);
    +           }
    +
    +           try {
    +                   // don't return just a JobSubmissionResult here, the 
signature is lying
    +                   // The CliFrontend expects this to be a 
JobExecutionResult
    +                   this.lastJobExecutionResult = new 
SerializedJobExecutionResult(
    +                           jobExecutionResult.getJobId(),
    +                           jobExecutionResult.getNetRuntime(),
    +                           
jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader);
    --- End diff --
    
    We could also directly use `AccumulatorHelper#deserializeAccumulators`.


---

Reply via email to