This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 605d158f013ba0fd43fa0ae68b8936b9288ac715 Author: Stephan Ewen <se...@apache.org> AuthorDate: Sun Apr 11 19:27:01 2021 +0200 [FLINK-18071][coordination] (part 3) Adjust OperatorEventValve to accept self-contained "send actions". The "send actions" (as Callables) contain the events and versioned target information, so they can precisely address the recipient regardless of whether they are sent immediately or later, after the target task has failed/recovered. --- .../operators/coordination/OperatorEventValve.java | 43 ++++++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java index 99eb7aa..25cc519 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; @@ -30,6 +31,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; @@ -105,15 +107,25 @@ final class OperatorEventValve { CompletableFuture<Acknowledge> result) { checkRunsInMainThread(); + final Callable<CompletableFuture<Acknowledge>> sendAction = + () -> eventSender.apply(event, subtask); + sendEvent(sendAction, subtask, result); + } + + public void sendEvent( + Callable<CompletableFuture<Acknowledge>> sendAction, + int subtask, + CompletableFuture<Acknowledge> result) { + checkRunsInMainThread(); + if (!shut) { - final CompletableFuture<Acknowledge> ack = eventSender.apply(event, subtask); - FutureUtils.forward(ack, result); + callSendAction(sendAction, result); return; } final List<BlockedEvent> eventsForTask = blockedEvents.computeIfAbsent(subtask, (key) -> new ArrayList<>()); - eventsForTask.add(new BlockedEvent(event, subtask, result)); + eventsForTask.add(new BlockedEvent(sendAction, result)); } /** @@ -190,9 +202,7 @@ final class OperatorEventValve { for (List<BlockedEvent> eventsForTask : blockedEvents.values()) { for (BlockedEvent blockedEvent : eventsForTask) { - final CompletableFuture<Acknowledge> ackFuture = - eventSender.apply(blockedEvent.event, blockedEvent.subtask); - FutureUtils.forward(ackFuture, blockedEvent.future); + callSendAction(blockedEvent.sendAction, blockedEvent.future); } } blockedEvents.clear(); @@ -242,21 +252,30 @@ final class OperatorEventValve { } } + private static void callSendAction( + Callable<CompletableFuture<Acknowledge>> sendAction, + CompletableFuture<Acknowledge> result) { + try { + final CompletableFuture<Acknowledge> sendResult = sendAction.call(); + FutureUtils.forward(sendResult, result); + } catch (Throwable t) { + ExceptionUtils.rethrowIfFatalError(t); + result.completeExceptionally(t); + } + } + // ------------------------------------------------------------------------ private static final class BlockedEvent { - final SerializedValue<OperatorEvent> event; + final Callable<CompletableFuture<Acknowledge>> sendAction; final CompletableFuture<Acknowledge> future; - final int subtask; BlockedEvent( - SerializedValue<OperatorEvent> event, - int subtask, + Callable<CompletableFuture<Acknowledge>> sendAction, CompletableFuture<Acknowledge> future) { - this.event = event; + this.sendAction = sendAction; this.future = future; - this.subtask = subtask; } } }