This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 605d158f013ba0fd43fa0ae68b8936b9288ac715
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Apr 11 19:27:01 2021 +0200

    [FLINK-18071][coordination] (part 3) Adjust OperatorEventValve to accept 
self-contained "send actions".
    
    The "send actions" (as Callables) contain the events and versioned target 
information, so they can precisely
    address the recipient regardless of whether they are sent immediately or 
later, after the target
    task has failed/recovered.
---
 .../operators/coordination/OperatorEventValve.java | 43 ++++++++++++++++------
 1 file changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
index 99eb7aa..25cc519 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 
@@ -30,6 +31,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
 
@@ -105,15 +107,25 @@ final class OperatorEventValve {
             CompletableFuture<Acknowledge> result) {
         checkRunsInMainThread();
 
+        final Callable<CompletableFuture<Acknowledge>> sendAction =
+                () -> eventSender.apply(event, subtask);
+        sendEvent(sendAction, subtask, result);
+    }
+
+    public void sendEvent(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            int subtask,
+            CompletableFuture<Acknowledge> result) {
+        checkRunsInMainThread();
+
         if (!shut) {
-            final CompletableFuture<Acknowledge> ack = 
eventSender.apply(event, subtask);
-            FutureUtils.forward(ack, result);
+            callSendAction(sendAction, result);
             return;
         }
 
         final List<BlockedEvent> eventsForTask =
                 blockedEvents.computeIfAbsent(subtask, (key) -> new 
ArrayList<>());
-        eventsForTask.add(new BlockedEvent(event, subtask, result));
+        eventsForTask.add(new BlockedEvent(sendAction, result));
     }
 
     /**
@@ -190,9 +202,7 @@ final class OperatorEventValve {
 
         for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
             for (BlockedEvent blockedEvent : eventsForTask) {
-                final CompletableFuture<Acknowledge> ackFuture =
-                        eventSender.apply(blockedEvent.event, 
blockedEvent.subtask);
-                FutureUtils.forward(ackFuture, blockedEvent.future);
+                callSendAction(blockedEvent.sendAction, blockedEvent.future);
             }
         }
         blockedEvents.clear();
@@ -242,21 +252,30 @@ final class OperatorEventValve {
         }
     }
 
+    private static void callSendAction(
+            Callable<CompletableFuture<Acknowledge>> sendAction,
+            CompletableFuture<Acknowledge> result) {
+        try {
+            final CompletableFuture<Acknowledge> sendResult = 
sendAction.call();
+            FutureUtils.forward(sendResult, result);
+        } catch (Throwable t) {
+            ExceptionUtils.rethrowIfFatalError(t);
+            result.completeExceptionally(t);
+        }
+    }
+
     // ------------------------------------------------------------------------
 
     private static final class BlockedEvent {
 
-        final SerializedValue<OperatorEvent> event;
+        final Callable<CompletableFuture<Acknowledge>> sendAction;
         final CompletableFuture<Acknowledge> future;
-        final int subtask;
 
         BlockedEvent(
-                SerializedValue<OperatorEvent> event,
-                int subtask,
+                Callable<CompletableFuture<Acknowledge>> sendAction,
                 CompletableFuture<Acknowledge> future) {
-            this.event = event;
+            this.sendAction = sendAction;
             this.future = future;
-            this.subtask = subtask;
         }
     }
 }

Reply via email to