dmvk commented on a change in pull request #17000:
URL: https://github.com/apache/flink/pull/17000#discussion_r698686629
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -147,35 +146,34 @@ public void stop() {
*/
private CompletableFuture<Acknowledge>
runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcherGateway) {
- return applicationCompletionFuture
- .handle(
- (ignored, t) -> {
- if (t == null) {
- LOG.info("Application completed SUCCESSFULLY");
- return dispatcherGateway.shutDownCluster(
- ApplicationStatus.SUCCEEDED);
- }
-
- final Optional<UnsuccessfulExecutionException>
maybeException =
- ExceptionUtils.findThrowable(
- t,
UnsuccessfulExecutionException.class);
- if (maybeException.isPresent()) {
- final ApplicationStatus applicationStatus =
- maybeException.get().getStatus();
- if (applicationStatus ==
ApplicationStatus.CANCELED
- || applicationStatus ==
ApplicationStatus.FAILED) {
- LOG.info("Application {}: ",
applicationStatus, t);
- return
dispatcherGateway.shutDownCluster(applicationStatus);
- }
- }
-
- LOG.warn("Application failed unexpectedly: ", t);
- this.errorHandler.onFatalError(
- new FlinkException("Application failed
unexpectedly.", t));
-
- return
FutureUtils.<Acknowledge>completedExceptionally(t);
- })
- .thenCompose(Function.identity());
+ final CompletableFuture<Acknowledge> shutdownFuture =
+ applicationCompletionFuture
+ .handle(
+ (ignored, t) -> {
+ if (t == null) {
+ LOG.info("Application completed
SUCCESSFULLY");
+ return
dispatcherGateway.shutDownCluster(
+ ApplicationStatus.SUCCEEDED);
+ }
+ final
Optional<UnsuccessfulExecutionException> maybeException =
+ ExceptionUtils.findThrowable(
+ t,
UnsuccessfulExecutionException.class);
+ if (maybeException.isPresent()) {
+ final ApplicationStatus
applicationStatus =
+
maybeException.get().getStatus();
+ if (applicationStatus ==
ApplicationStatus.CANCELED
+ || applicationStatus ==
ApplicationStatus.FAILED) {
+ LOG.info("Application {}: ",
applicationStatus, t);
+ return
dispatcherGateway.shutDownCluster(
+ applicationStatus);
+ }
+ }
+ LOG.warn("Application failed unexpectedly:
", t);
+ return
dispatcherGateway.shutDownClusterExceptionally(t);
Review comment:
You're right. Another approach could be giving up leadership /
restarting dispatcher, but this could be tricky to do 🤔 Not sure if that would
actually improve anything, probably not worth the effort.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]