This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e676442  [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase 
work with Adaptive Scheduler.
e676442 is described below

commit e676442b9faa1ec0b668e8394dd2353ac2de01c6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri Apr 23 17:17:01 2021 +0200

    [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with 
Adaptive Scheduler.
    
    The test previously relied on an implicit contract that instances of 
OperatorCoordinators are never recreated
    on the same JobManager. That implicit contract is no longer true with the 
Adaptive Scheduler.
    
    This change adjusts the test to no longer make that assumption.
    
    This closes #15739
---
 .../CoordinatorEventsExactlyOnceITCase.java        | 112 ++++++++++++++++-----
 1 file changed, 86 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index edc9ede..2337115 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -64,11 +64,13 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -169,6 +171,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
     @Test
     public void test() throws Exception {
+        // this captures variables communicated across instances, recoveries, 
etc.
+        TestScript.reset();
+
         final int numEvents1 = 200;
         final int numEvents2 = 5;
         final int delay1 = 1;
@@ -296,19 +301,23 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
         private final int delay;
         private final int maxNumber;
+        private final int failAtMessage;
         private int nextNumber;
 
         private CompletableFuture<byte[]> requestedCheckpoint;
         private CompletableFuture<byte[]> nextToComplete;
 
-        private final int failAtMessage;
-        private boolean failedBefore;
-
-        private final ArrayDeque<CountDownLatch> recoveredTaskRunning = new 
ArrayDeque<>();
-
         private SubtaskGateway subtaskGateway;
         private boolean workLoopRunning;
 
+        /**
+         * This contains all variables that are necessary to track the 
progress of the test, and
+         * which need to be tracked across instances of this coordinator (some 
scheduler
+         * implementations may re-instantiate the ExecutionGraph and the 
coordinators around global
+         * failures).
+         */
+        private final TestScript testScript;
+
         private EventSendingCoordinator(Context context, String name, int 
numEvents, int delay) {
             checkArgument(delay > 0);
             checkArgument(numEvents >= 3);
@@ -316,6 +325,9 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
             this.context = context;
             this.maxNumber = numEvents;
             this.delay = delay;
+
+            this.testScript = TestScript.getForOperator(name);
+
             this.mailboxExecutor =
                     Executors.newSingleThreadExecutor(
                             new DispatcherThreadFactory(
@@ -349,17 +361,12 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
                         String.format("Don't recognize event '%s' from task 
%d.", event, subtask));
             }
 
-            // We complete all events that were enqueued. We may need to 
complete
-            // multiple ones here, because it can happen that after a failure 
no real recovery
-            // happens that results in an event being sent (and this method 
being called), but that
-            // immediately another failure comes, triggered by the other 
operator coordinator (or
-            // its task).
-            synchronized (recoveredTaskRunning) {
-                for (CountDownLatch latch : recoveredTaskRunning) {
-                    latch.countDown();
-                }
-                recoveredTaskRunning.clear();
-            }
+            // this unblocks all the delayed actions that where kicked off 
while the previous
+            // task was still running (if there was a previous task). this is 
part of simulating
+            // the extreme race where the coordinator thread stalls for so 
long that a new
+            // task execution attempt gets deployed before the last events 
targeted at the old task
+            // where sent.
+            testScript.signalRecoveredTaskReady();
 
             // first, we hand this over to the mailbox thread, so we preserve 
order on operations,
             // even if the action is only to do a thread safe scheduling into 
the scheduledExecutor
@@ -375,13 +382,13 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
 
         @Override
         public void subtaskFailed(int subtask, @Nullable Throwable reason) {
-            // we need to create and enqueue this outside the mailbox, so that 
the
-            // enqueuing is strictly ordered with the completion (which also 
happens outside
-            // the mail box executor).
+            // we need to create and register this outside the mailbox so that 
the
+            // registration is not affected by the artificial stall on the 
mailbox, but happens
+            // strictly before the tasks are restored and the operator events 
are received (to
+            // trigger the latches) which also happens outside the mailbox.
+
             final CountDownLatch successorIsRunning = new CountDownLatch(1);
-            synchronized (recoveredTaskRunning) {
-                recoveredTaskRunning.addLast(successorIsRunning);
-            }
+            
testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning);
 
             // simulate a heavy thread race here: the mailbox has a last 
enqueued action before the
             // cancellation is processed. But through a race, the mailbox 
freezes for a while and in
@@ -483,7 +490,12 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
                 System.exit(-1);
             }
 
-            // schedule the next step
+            // schedule the next step. we do this here, after the previous 
step concluded, rather
+            // than scheduling a periodic action. Otherwise, the periodic task 
would enqueue many
+            // actions while the mailbox stalls and process them all 
instantaneously after the
+            // un-stalling. That wouldn't break the test, but it voids the 
differences in event
+            // sending delays between the different coordinators, which are 
part of provoking the
+            // situation that requires checkpoint alignment between the 
coordinators' event streams.
             scheduleSingleAction();
         }
 
@@ -515,8 +527,8 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
         }
 
         private void checkWhetherToTriggerFailure() {
-            if (nextNumber >= failAtMessage && !failedBefore) {
-                failedBefore = true;
+            if (nextNumber >= failAtMessage && !testScript.hasAlreadyFailed()) 
{
+                testScript.recordHasFailed();
                 context.failJob(new Exception("test failure"));
             }
         }
@@ -623,6 +635,54 @@ public class CoordinatorEventsExactlyOnceITCase extends 
TestLogger {
     }
 
     // ------------------------------------------------------------------------
+    //  dedicated class to hold the "test script"
+    // ------------------------------------------------------------------------
+
+    private static final class TestScript {
+
+        private static final Map<String, TestScript> MAP_FOR_OPERATOR = new 
HashMap<>();
+
+        static TestScript getForOperator(String operatorName) {
+            return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new 
TestScript());
+        }
+
+        static void reset() {
+            MAP_FOR_OPERATOR.clear();
+        }
+
+        private final Collection<CountDownLatch> recoveredTaskRunning = new 
ArrayList<>();
+        private boolean failedBefore;
+
+        void recordHasFailed() {
+            this.failedBefore = true;
+        }
+
+        boolean hasAlreadyFailed() {
+            return failedBefore;
+        }
+
+        void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) {
+            synchronized (recoveredTaskRunning) {
+                recoveredTaskRunning.add(latch);
+            }
+        }
+
+        void signalRecoveredTaskReady() {
+            // We complete all latches that were registered. We may need to 
complete
+            // multiple ones here, because it can happen that after a previous 
failure, the next
+            // executions fails immediately again, before even registering at 
the coordinator.
+            // in that case, we have multiple latches from multiple failure 
notifications waiting
+            // to be completed.
+            synchronized (recoveredTaskRunning) {
+                for (CountDownLatch latch : recoveredTaskRunning) {
+                    latch.countDown();
+                }
+                recoveredTaskRunning.clear();
+            }
+        }
+    }
+
+    // ------------------------------------------------------------------------
     //  serialization shenannigans
     // ------------------------------------------------------------------------
 

Reply via email to