This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3d2e1c49d6bb89d11e9dfc9c171f9d00aff2215
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Apr 16 16:48:17 2021 +0200

    [hotfix][tests] Simplify and harden CoordinatorEventsExactlyOnceITCase
---
 .../CoordinatorEventsExactlyOnceITCase.java        | 42 +++++++++++-----------
 1 file changed, 21 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 33222c3..edc9ede 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -71,6 +71,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -303,7 +304,7 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
         private final int failAtMessage;
         private boolean failedBefore;
 
-        private final ArrayDeque<CompletableFuture<?>> recoveredTaskRunning = 
new ArrayDeque<>();
+        private final ArrayDeque<CountDownLatch> recoveredTaskRunning = new 
ArrayDeque<>();
 
         private SubtaskGateway subtaskGateway;
         private boolean workLoopRunning;
@@ -348,14 +349,16 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
                         String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
             }
 
+            // We complete all events that were enqueued. We may need to 
complete
+            // multiple ones here, because it can happen that after a failure 
no real recovery
+            // happens that results in an event being sent (and this method 
being called), but that
+            // immediately another failure comes, triggered by the other 
operator coordinator (or
+            // its task).
             synchronized (recoveredTaskRunning) {
-                // signal the previous task that its recovered task is now 
running
-                final CompletableFuture<?> prevTaskFuture = 
recoveredTaskRunning.peekLast();
-                if (prevTaskFuture != null) {
-                    prevTaskFuture.complete(null);
+                for (CountDownLatch latch : recoveredTaskRunning) {
+                    latch.countDown();
                 }
-                // add a future for this task
-                recoveredTaskRunning.addLast(new CompletableFuture<>());
+                recoveredTaskRunning.clear();
             }
 
             // first, we hand this over to the mailbox thread, so we preserve 
order on operations,
@@ -372,6 +375,14 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
         @Override
         public void subtaskFailed(int subtask, @Nullable Throwable reason) {
+            // we need to create and enqueue this outside the mailbox, so that 
the
+            // enqueuing is strictly ordered with the completion (which also 
happens outside
+            // the mail box executor).
+            final CountDownLatch successorIsRunning = new CountDownLatch(1);
+            synchronized (recoveredTaskRunning) {
+                recoveredTaskRunning.addLast(successorIsRunning);
+            }
+
             // simulate a heavy thread race here: the mailbox has a last 
enqueued action before the
             // cancellation is processed. But through a race, the mailbox 
freezes for a while and in
             // that time, the task already went through a recovery cycle. By 
the time the mailbox
@@ -380,20 +391,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
             // coordinator before unfreezing the mailbox
             runInMailbox(
                     () -> {
-                        final CompletableFuture<?> thisTasksFuture;
-                        synchronized (recoveredTaskRunning) {
-                            thisTasksFuture = recoveredTaskRunning.peekFirst();
-                        }
-
-                        if (thisTasksFuture != null) {
-                            try {
-                                thisTasksFuture.get();
-                            } catch (Exception ignored) {
-                            }
-
-                            synchronized (recoveredTaskRunning) {
-                                recoveredTaskRunning.removeFirst();
-                            }
+                        try {
+                            successorIsRunning.await();
+                        } catch (Exception ignored) {
                         }
 
                         executeSingleAction();

Reply via email to