This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3d2e1c49d6bb89d11e9dfc9c171f9d00aff2215 Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri Apr 16 16:48:17 2021 +0200 [hotfix][tests] Simplify and harden CoordinatorEventsExactlyOnceITCase --- .../CoordinatorEventsExactlyOnceITCase.java | 42 +++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index 33222c3..edc9ede 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -71,6 +71,7 @@ import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -303,7 +304,7 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { private final int failAtMessage; private boolean failedBefore; - private final ArrayDeque<CompletableFuture<?>> recoveredTaskRunning = new ArrayDeque<>(); + private final ArrayDeque<CountDownLatch> recoveredTaskRunning = new ArrayDeque<>(); private SubtaskGateway subtaskGateway; private boolean workLoopRunning; @@ -348,14 +349,16 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { String.format("Don't recognize event '%s' from task %d.", event, subtask)); } + // We complete all events that were enqueued. We may need to complete + // multiple ones here, because it can happen that after a failure no real recovery + // happens that results in an event being sent (and this method being called), but that + // immediately another failure comes, triggered by the other operator coordinator (or + // its task). synchronized (recoveredTaskRunning) { - // signal the previous task that its recovered task is now running - final CompletableFuture<?> prevTaskFuture = recoveredTaskRunning.peekLast(); - if (prevTaskFuture != null) { - prevTaskFuture.complete(null); + for (CountDownLatch latch : recoveredTaskRunning) { + latch.countDown(); } - // add a future for this task - recoveredTaskRunning.addLast(new CompletableFuture<>()); + recoveredTaskRunning.clear(); } // first, we hand this over to the mailbox thread, so we preserve order on operations, @@ -372,6 +375,14 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { + // we need to create and enqueue this outside the mailbox, so that the + // enqueuing is strictly ordered with the completion (which also happens outside + // the mail box executor). + final CountDownLatch successorIsRunning = new CountDownLatch(1); + synchronized (recoveredTaskRunning) { + recoveredTaskRunning.addLast(successorIsRunning); + } + // simulate a heavy thread race here: the mailbox has a last enqueued action before the // cancellation is processed. But through a race, the mailbox freezes for a while and in // that time, the task already went through a recovery cycle. By the time the mailbox @@ -380,20 +391,9 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { // coordinator before unfreezing the mailbox runInMailbox( () -> { - final CompletableFuture<?> thisTasksFuture; - synchronized (recoveredTaskRunning) { - thisTasksFuture = recoveredTaskRunning.peekFirst(); - } - - if (thisTasksFuture != null) { - try { - thisTasksFuture.get(); - } catch (Exception ignored) { - } - - synchronized (recoveredTaskRunning) { - recoveredTaskRunning.removeFirst(); - } + try { + successorIsRunning.await(); + } catch (Exception ignored) { } executeSingleAction();