Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5944#discussion_r185164034 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java --- @@ -109,7 +119,11 @@ public MiniDispatcher( if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) { // terminate the MiniDispatcher once we served the first JobResult successfully - jobResultFuture.whenComplete((JobResult ignored, Throwable throwable) -> shutDown()); + jobResultFuture.whenComplete((JobResult result, Throwable throwable) -> { + ApplicationStatus status = result.getSerializedThrowable().isPresent() ? + ApplicationStatus.FAILED : ApplicationStatus.SUCCEEDED; + jobTerminationFuture.complete(status); --- End diff -- I think the functional way would be: ``` jobTerminationFuture.complete(result.getSerializedThrowable() .map(serializedThrowable -> ApplicationStatus.FAILED) .orElse(ApplicationStatus.SUCCEEDED)); ```
---