This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b1e18607d08727a62847809638b47346b55ff63b Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Apr 13 10:44:25 2021 +0200 [hotfix][coordination] Reduce lambda nesting for action on CompletableFuture Directly use 'whenCompleteAsync()' with an executor, rather than 'whenComplete()' and nest a call to submit to the executor. --- .../coordination/OperatorCoordinatorHolder.java | 32 ++++++++++------------ 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index a6e2317..1baa738 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -276,23 +276,21 @@ public class OperatorCoordinatorHolder final CompletableFuture<byte[]> coordinatorCheckpoint = new CompletableFuture<>(); - coordinatorCheckpoint.whenComplete( - (success, failure) -> - mainThreadExecutor.execute( - () -> { - if (failure != null) { - result.completeExceptionally(failure); - } else if (eventValve.tryShutValve(checkpointId)) { - result.complete(success); - } else { - // if we cannot shut the valve, this means the checkpoint - // has been aborted before, so the future is already - // completed exceptionally. but we try to complete it here - // again, just in case, as a safety net. - result.completeExceptionally( - new FlinkException("Cannot shut event valve")); - } - })); + coordinatorCheckpoint.whenCompleteAsync( + (success, failure) -> { + if (failure != null) { + result.completeExceptionally(failure); + } else if (eventValve.tryShutValve(checkpointId)) { + result.complete(success); + } else { + // if we cannot shut the valve, this means the checkpoint + // has been aborted before, so the future is already + // completed exceptionally. but we try to complete it here + // again, just in case, as a safety net. + result.completeExceptionally(new FlinkException("Cannot shut event valve")); + } + }, + mainThreadExecutor); try { eventValve.markForCheckpoint(checkpointId);