tillrohrmann commented on a change in pull request #13583:
URL: https://github.com/apache/flink/pull/13583#discussion_r503191078



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -141,25 +141,29 @@ public void stop() {
                                        final ApplicationStatus 
applicationStatus;
                                        if (t != null) {
 
-                                               final 
Optional<JobCancellationException> cancellationException =
-                                                               
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
-                                               if 
(cancellationException.isPresent()) {
+                                               if 
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
                                                        // this means the Flink 
Job was cancelled
                                                        applicationStatus = 
ApplicationStatus.CANCELED;
-                                               } else if (t instanceof 
CancellationException) {
-                                                       // this means that the 
future was cancelled
-                                                       applicationStatus = 
ApplicationStatus.UNKNOWN;
-                                               } else {
+                                                       LOG.warn("Application 
{}: ", applicationStatus, t);

Review comment:
       Warn log level seems to high for a normal state transition.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
##########
@@ -82,4 +82,8 @@
        default CompletableFuture<Acknowledge> 
shutDownCluster(ApplicationStatus applicationStatus) {
                return shutDownCluster();
        }
+
+       default CompletableFuture<Acknowledge> 
shutDownClusterExceptionally(final Throwable throwable) {
+               throw new UnsupportedOperationException();
+       }

Review comment:
       I think we should not introduce this method. Instead the 
`ApplicationDispatcherBootstrap` should get a `FatalErrorHandler` which it 
simply calls if it sees an unknown exception.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -141,25 +141,29 @@ public void stop() {
                                        final ApplicationStatus 
applicationStatus;
                                        if (t != null) {
 
-                                               final 
Optional<JobCancellationException> cancellationException =
-                                                               
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
-                                               if 
(cancellationException.isPresent()) {
+                                               if 
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
                                                        // this means the Flink 
Job was cancelled
                                                        applicationStatus = 
ApplicationStatus.CANCELED;
-                                               } else if (t instanceof 
CancellationException) {
-                                                       // this means that the 
future was cancelled
-                                                       applicationStatus = 
ApplicationStatus.UNKNOWN;
-                                               } else {
+                                                       LOG.warn("Application 
{}: ", applicationStatus, t);
+
+                                                       return 
dispatcher.shutDownCluster(applicationStatus);
+                                               }
+
+                                               if 
(ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) {

Review comment:
       Are you sure that a `JobExecutionException` means that the Flink job has 
reached a globally terminal state? Looking at `JobResult.toJobExecutionResult` 
it seems that we also throw this exception if the state is 
`ApplicationStatus.UNKNOWN`.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -141,25 +141,29 @@ public void stop() {
                                        final ApplicationStatus 
applicationStatus;
                                        if (t != null) {
 
-                                               final 
Optional<JobCancellationException> cancellationException =
-                                                               
ExceptionUtils.findThrowable(t, JobCancellationException.class);
-
-                                               if 
(cancellationException.isPresent()) {
+                                               if 
(ExceptionUtils.findThrowable(t, JobCancellationException.class).isPresent()) {
                                                        // this means the Flink 
Job was cancelled
                                                        applicationStatus = 
ApplicationStatus.CANCELED;
-                                               } else if (t instanceof 
CancellationException) {
-                                                       // this means that the 
future was cancelled
-                                                       applicationStatus = 
ApplicationStatus.UNKNOWN;
-                                               } else {
+                                                       LOG.warn("Application 
{}: ", applicationStatus, t);
+
+                                                       return 
dispatcher.shutDownCluster(applicationStatus);
+                                               }
+
+                                               if 
(ExceptionUtils.findThrowable(t, JobExecutionException.class).isPresent()) {
                                                        applicationStatus = 
ApplicationStatus.FAILED;
+                                                       LOG.warn("Application 
{}: ", applicationStatus, t);

Review comment:
       Same here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to