This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 61a9f25cd827d09e64a9a3f39c0fb85639007b89 Author: Kostas Kloudas <[email protected]> AuthorDate: Wed Oct 21 21:00:27 2020 +0200 [hotfix] minor refactoring in ApplicationDispatcherBootstrap --- .../ApplicationDispatcherBootstrap.java | 32 +++++++++++----------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java index 1443804..48dd704 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java @@ -142,28 +142,28 @@ public class ApplicationDispatcherBootstrap implements DispatcherBootstrap { private CompletableFuture<Acknowledge> runApplicationAndShutdownClusterAsync(final DispatcherGateway dispatcherGateway) { return applicationCompletionFuture .handle((r, t) -> { - if (t != null) { - final Optional<ApplicationFailureException> exception = - ExceptionUtils.findThrowable(t, ApplicationFailureException.class); - - if (exception.isPresent()) { - final ApplicationStatus applicationStatus = exception.get().getStatus(); + if (t == null) { + LOG.info("Application completed SUCCESSFULLY"); + return dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED); + } - if (applicationStatus == ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) { - LOG.info("Application {}: ", applicationStatus, t); - return dispatcherGateway.shutDownCluster(applicationStatus); - } - } + final Optional<ApplicationFailureException> exception = + ExceptionUtils.findThrowable(t, ApplicationFailureException.class); - LOG.warn("Exiting with Application Status UNKNOWN: ", t); - this.errorHandler.onFatalError(new FlinkException("Application failed unexpectedly.", t)); + if (exception.isPresent()) { + final ApplicationStatus applicationStatus = exception.get().getStatus(); - return FutureUtils.<Acknowledge>completedExceptionally(t); + if (applicationStatus == ApplicationStatus.CANCELED || applicationStatus == ApplicationStatus.FAILED) { + LOG.info("Application {}: ", applicationStatus, t); + return dispatcherGateway.shutDownCluster(applicationStatus); + } } - LOG.info("Application completed SUCCESSFULLY"); - return dispatcherGateway.shutDownCluster(ApplicationStatus.SUCCEEDED); + LOG.warn("Application failed unexpectedly: ", t); + this.errorHandler.onFatalError(new FlinkException("Application failed unexpectedly.", t)); + + return FutureUtils.<Acknowledge>completedExceptionally(t); }) .thenCompose(Function.identity()); }
