This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b1e18607d08727a62847809638b47346b55ff63b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Apr 13 10:44:25 2021 +0200

    [hotfix][coordination] Reduce lambda nesting for action on CompletableFuture
    
    Directly use 'whenCompleteAsync()' with an executor, rather than 
'whenComplete()' and nest a call
    to submit to the executor.
---
 .../coordination/OperatorCoordinatorHolder.java    | 32 ++++++++++------------
 1 file changed, 15 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index a6e2317..1baa738 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -276,23 +276,21 @@ public class OperatorCoordinatorHolder
 
         final CompletableFuture<byte[]> coordinatorCheckpoint = new 
CompletableFuture<>();
 
-        coordinatorCheckpoint.whenComplete(
-                (success, failure) ->
-                        mainThreadExecutor.execute(
-                                () -> {
-                                    if (failure != null) {
-                                        result.completeExceptionally(failure);
-                                    } else if 
(eventValve.tryShutValve(checkpointId)) {
-                                        result.complete(success);
-                                    } else {
-                                        // if we cannot shut the valve, this 
means the checkpoint
-                                        // has been aborted before, so the 
future is already
-                                        // completed exceptionally. but we try 
to complete it here
-                                        // again, just in case, as a safety 
net.
-                                        result.completeExceptionally(
-                                                new FlinkException("Cannot 
shut event valve"));
-                                    }
-                                }));
+        coordinatorCheckpoint.whenCompleteAsync(
+                (success, failure) -> {
+                    if (failure != null) {
+                        result.completeExceptionally(failure);
+                    } else if (eventValve.tryShutValve(checkpointId)) {
+                        result.complete(success);
+                    } else {
+                        // if we cannot shut the valve, this means the 
checkpoint
+                        // has been aborted before, so the future is already
+                        // completed exceptionally. but we try to complete it 
here
+                        // again, just in case, as a safety net.
+                        result.completeExceptionally(new 
FlinkException("Cannot shut event valve"));
+                    }
+                },
+                mainThreadExecutor);
 
         try {
             eventValve.markForCheckpoint(checkpointId);

Reply via email to