dmvk commented on a change in pull request #17000:
URL: https://github.com/apache/flink/pull/17000#discussion_r700124308
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -147,35 +147,41 @@ public void stop() {
*/
private CompletableFuture<Acknowledge>
runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcherGateway) {
- return applicationCompletionFuture
- .handle(
- (ignored, t) -> {
- if (t == null) {
- LOG.info("Application completed SUCCESSFULLY");
- return dispatcherGateway.shutDownCluster(
- ApplicationStatus.SUCCEEDED);
- }
-
- final Optional<UnsuccessfulExecutionException>
maybeException =
- ExceptionUtils.findThrowable(
- t,
UnsuccessfulExecutionException.class);
- if (maybeException.isPresent()) {
- final ApplicationStatus applicationStatus =
- maybeException.get().getStatus();
- if (applicationStatus ==
ApplicationStatus.CANCELED
- || applicationStatus ==
ApplicationStatus.FAILED) {
- LOG.info("Application {}: ",
applicationStatus, t);
- return
dispatcherGateway.shutDownCluster(applicationStatus);
- }
- }
-
- LOG.warn("Application failed unexpectedly: ", t);
- this.errorHandler.onFatalError(
- new FlinkException("Application failed
unexpectedly.", t));
-
- return
FutureUtils.<Acknowledge>completedExceptionally(t);
- })
- .thenCompose(Function.identity());
+ final CompletableFuture<Acknowledge> shutdownFuture =
+ applicationCompletionFuture
+ .handle(
+ (ignored, t) -> {
+ if (t == null) {
+ LOG.info("Application completed
SUCCESSFULLY");
+ return
dispatcherGateway.shutDownCluster(
+ ApplicationStatus.SUCCEEDED);
+ }
+ final
Optional<UnsuccessfulExecutionException> maybeException =
+ ExceptionUtils.findThrowable(
+ t,
UnsuccessfulExecutionException.class);
+ if (maybeException.isPresent()) {
+ final ApplicationStatus
applicationStatus =
+
maybeException.get().getStatus();
+ if (applicationStatus ==
ApplicationStatus.CANCELED
+ || applicationStatus ==
ApplicationStatus.FAILED) {
+ LOG.info("Application {}: ",
applicationStatus, t);
+ return
dispatcherGateway.shutDownCluster(
+ applicationStatus);
+ }
+ }
+
+ if (t instanceof CancellationException) {
+ LOG.warn(
+ "Application is cancelled
because the executing dispatcher lost leadership.");
Review comment:
:+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]