tillrohrmann commented on a change in pull request #13583: URL: https://github.com/apache/flink/pull/13583#discussion_r503191078
########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java ########## @@ -141,25 +141,29 @@ public void stop() { final ApplicationStatus applicationStatus; if (t != null) { - final Optional<JobCancellationException> cancellationException = - ExceptionUtils.findThrowable(t, JobCancellationException.class); - - if (cancellationException.isPresent()) { + if (ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { // this means the Flink Job was cancelled applicationStatus = ApplicationStatus.CANCELED; - } else if (t instanceof CancellationException) { - // this means that the future was cancelled - applicationStatus = ApplicationStatus.UNKNOWN; - } else { + LOG.warn("Application {}: ", applicationStatus, t); Review comment: Warn log level seems to high for a normal state transition. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java ########## @@ -82,4 +82,8 @@ default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) { return shutDownCluster(); } + + default CompletableFuture<Acknowledge> shutDownClusterExceptionally(final Throwable throwable) { + throw new UnsupportedOperationException(); + } Review comment: I think we should not introduce this method. Instead the `ApplicationDispatcherBootstrap` should get a `FatalErrorHandler` which it simply calls if it sees an unknown exception. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java ########## @@ -141,25 +141,29 @@ public void stop() { final ApplicationStatus applicationStatus; if (t != null) { - final Optional<JobCancellationException> cancellationException = - ExceptionUtils.findThrowable(t, JobCancellationException.class); - - if (cancellationException.isPresent()) { + if (ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { // this means the Flink Job was cancelled applicationStatus = ApplicationStatus.CANCELED; - } else if (t instanceof CancellationException) { - // this means that the future was cancelled - applicationStatus = ApplicationStatus.UNKNOWN; - } else { + LOG.warn("Application {}: ", applicationStatus, t); + + return dispatcher.shutDownCluster(applicationStatus); + } + + if (ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) { Review comment: Are you sure that a `JobExecutionException` means that the Flink job has reached a globally terminal state? Looking at `JobResult.toJobExecutionResult` it seems that we also throw this exception if the state is `ApplicationStatus.UNKNOWN`. ########## File path: flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java ########## @@ -141,25 +141,29 @@ public void stop() { final ApplicationStatus applicationStatus; if (t != null) { - final Optional<JobCancellationException> cancellationException = - ExceptionUtils.findThrowable(t, JobCancellationException.class); - - if (cancellationException.isPresent()) { + if (ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) { // this means the Flink Job was cancelled applicationStatus = ApplicationStatus.CANCELED; - } else if (t instanceof CancellationException) { - // this means that the future was cancelled - applicationStatus = ApplicationStatus.UNKNOWN; - } else { + LOG.warn("Application {}: ", applicationStatus, t); + + return dispatcher.shutDownCluster(applicationStatus); + } + + if (ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) { applicationStatus = ApplicationStatus.FAILED; + LOG.warn("Application {}: ", applicationStatus, t); Review comment: Same here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org