This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 67184c27ca46b3562f0a9651663a091a17dadbbc
Author: Aleksey Pak <alek...@ververica.com>
AuthorDate: Mon Jul 8 17:11:46 2019 +0200

    [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on 
SourceStreamTask checkpoint injecting thread
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  29 ++---
 .../runtime/tasks/SynchronousSavepointLatch.java   |  55 ++++----
 .../runtime/tasks/SourceTaskTerminationTest.java   | 103 +++++----------
 .../runtime/tasks/SynchronousCheckpointITCase.java |  27 +---
 .../tasks/SynchronousSavepointSyncLatchTest.java   | 144 +++++++++++----------
 5 files changed, 148 insertions(+), 210 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 2f4dd6a..f9fe110 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -702,7 +702,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        CheckpointMetrics checkpointMetrics) throws Exception {
 
                try {
-                       performCheckpoint(checkpointMetaData, 
checkpointOptions, checkpointMetrics, false);
+                       if (performCheckpoint(checkpointMetaData, 
checkpointOptions, checkpointMetrics, false)) {
+                               if (syncSavepointLatch.isSet()) {
+                                       
syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
+                               }
+                       }
                }
                catch (CancelTaskException e) {
                        LOG.info("Operator {} was cancelled while performing 
checkpoint {}.",
@@ -739,7 +743,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                final long checkpointId = checkpointMetaData.getCheckpointId();
 
-               final boolean result;
                synchronized (lock) {
                        if (isRunning) {
 
@@ -770,7 +773,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                //           impact progress of the streaming 
topology
                                checkpointState(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
 
-                               result = true;
+                               return true;
                        }
                        else {
                                // we cannot perform our checkpoint - let the 
downstream operators know that they
@@ -795,21 +798,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        throw exception;
                                }
 
-                               result = false;
-                       }
-               }
-
-               if (isRunning && syncSavepointLatch.isSet()) {
-
-                       final boolean checkpointWasAcked =
-                                       
syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
-
-                       if (checkpointWasAcked) {
-                               finishTask();
+                               return false;
                        }
                }
-
-               return result;
        }
 
        public ExecutorService getAsyncOperationsThreadPool() {
@@ -818,6 +809,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+               boolean success = false;
                synchronized (lock) {
                        if (isRunning) {
                                LOG.debug("Notification of complete checkpoint 
for task {}", getName());
@@ -828,12 +820,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        }
                                }
 
-                               
syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId);
+                               success = true;
                        }
                        else {
                                LOG.debug("Ignoring notification of complete 
checkpoint for not-running task {}", getName());
                        }
                }
+               if (success) {
+                       
syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, 
this::finishTask);
+               }
        }
 
        private void tryShutdownTimerService() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
index 0a67dac..d10c19c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.util.function.RunnableWithException;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 /**
  * A synchronization primitive used by the {@link StreamTask} to wait
@@ -29,12 +33,17 @@ class SynchronousSavepointLatch {
 
        private static final long NOT_SET_CHECKPOINT_ID = -1L;
 
-       // these are mutually exclusive
+       enum CompletionResult {
+               COMPLETED,
+               CANCELED,
+       }
+
+       @GuardedBy("synchronizationPoint")
        private volatile boolean waiting;
-       private volatile boolean completed;
-       private volatile boolean canceled;
 
-       private volatile boolean wasAlreadyCompleted;
+       @GuardedBy("synchronizationPoint")
+       @Nullable
+       private volatile CompletionResult completionResult;
 
        private final Object synchronizationPoint;
 
@@ -44,8 +53,6 @@ class SynchronousSavepointLatch {
                this.synchronizationPoint = new Object();
 
                this.waiting = false;
-               this.completed = false;
-               this.canceled = false;
                this.checkpointId = NOT_SET_CHECKPOINT_ID;
        }
 
@@ -59,45 +66,38 @@ class SynchronousSavepointLatch {
                }
        }
 
-       boolean blockUntilCheckpointIsAcknowledged() throws Exception {
+       void blockUntilCheckpointIsAcknowledged() throws Exception {
                synchronized (synchronizationPoint) {
-                       if (isSet() && !isDone()) {
+                       if (completionResult == null && isSet()) {
                                waiting = true;
                                synchronizationPoint.wait();
                                waiting = false;
                        }
-
-                       if (!isCanceled() && !wasAlreadyCompleted) {
-                               wasAlreadyCompleted = true;
-                               return true;
-                       }
-
-                       return false;
                }
        }
 
-       void acknowledgeCheckpointAndTrigger(final long checkpointId) {
+       void acknowledgeCheckpointAndTrigger(final long checkpointId, 
RunnableWithException runnable) throws Exception {
                synchronized (synchronizationPoint) {
-                       if (isSet() && !isDone() && this.checkpointId == 
checkpointId) {
-                               completed = true;
-                               synchronizationPoint.notifyAll();
+                       if (completionResult == null && this.checkpointId == 
checkpointId) {
+                               completionResult = CompletionResult.COMPLETED;
+                               try {
+                                       runnable.run();
+                               } finally {
+                                       synchronizationPoint.notifyAll();
+                               }
                        }
                }
        }
 
        void cancelCheckpointLatch() {
                synchronized (synchronizationPoint) {
-                       if (!isDone()) {
-                               canceled = true;
+                       if (completionResult == null) {
+                               completionResult = CompletionResult.CANCELED;
                                synchronizationPoint.notifyAll();
                        }
                }
        }
 
-       private boolean isDone () {
-               return canceled || completed;
-       }
-
        @VisibleForTesting
        boolean isWaiting() {
                return waiting;
@@ -105,15 +105,14 @@ class SynchronousSavepointLatch {
 
        @VisibleForTesting
        boolean isCompleted() {
-               return completed;
+               return completionResult == CompletionResult.COMPLETED;
        }
 
        @VisibleForTesting
        boolean isCanceled() {
-               return canceled;
+               return completionResult == CompletionResult.CANCELED;
        }
 
-       @VisibleForTesting
        boolean isSet() {
                return checkpointId != NOT_SET_CHECKPOINT_ID;
        }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
index 9295788..4a613d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -33,62 +32,56 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
 
-import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * A test verifying the termination process
  * (synchronous checkpoint and task termination) at the {@link 
SourceStreamTask}.
  */
-public class SourceTaskTerminationTest {
+public class SourceTaskTerminationTest extends TestLogger {
 
        private static OneShotLatch ready;
        private static MultiShotLatch runLoopStart;
        private static MultiShotLatch runLoopEnd;
 
-       private static AtomicReference<Throwable> error;
+       @Rule
+       public final Timeout timeoutPerTest = Timeout.seconds(20);
 
        @Before
        public void initialize() {
                ready = new OneShotLatch();
                runLoopStart = new MultiShotLatch();
                runLoopEnd = new MultiShotLatch();
-               error = new AtomicReference<>();
-
-               error.set(null);
-       }
-
-       @After
-       public void validate() {
-               validateNoExceptionsWereThrown();
        }
 
        @Test
-       public void 
terminateShouldBlockDuringCheckpointingAndEmitMaxWatermark() throws Exception {
+       public void testStopWithSavepointWithMaxWatermark() throws Exception {
                stopWithSavepointStreamTaskTestHelper(true);
        }
 
        @Test
-       public void 
suspendShouldBlockDuringCheckpointingAndNotEmitMaxWatermark() throws Exception {
+       public void testStopWithSavepointWithoutMaxWatermark() throws Exception 
{
                stopWithSavepointStreamTaskTestHelper(false);
        }
 
-       private void stopWithSavepointStreamTaskTestHelper(final boolean 
expectMaxWatermark) throws Exception {
+       private void stopWithSavepointStreamTaskTestHelper(final boolean 
withMaxWatermark) throws Exception {
                final long syncSavepointId = 34L;
 
                final StreamTaskTestHarness<Long> srcTaskTestHarness = 
getSourceStreamTaskTestHarness();
                final Thread executionThread = srcTaskTestHarness.invoke();
                final StreamTask<Long, ?> srcTask = 
srcTaskTestHarness.getTask();
+               final SynchronousSavepointLatch syncSavepointLatch = 
srcTask.getSynchronousSavepointLatch();
 
                ready.await();
 
@@ -96,15 +89,29 @@ public class SourceTaskTerminationTest {
                emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L);
                emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L);
 
-               emitAndVerifyCheckpoint(srcTaskTestHarness, srcTask, 31L);
+               srcTask.triggerCheckpoint(
+                               new CheckpointMetaData(31L, 900),
+                               
CheckpointOptions.forCheckpointWithDefaultLocation(),
+                               false);
+
+               assertFalse(syncSavepointLatch.isSet());
+               assertFalse(syncSavepointLatch.isCompleted());
+               assertFalse(syncSavepointLatch.isWaiting());
+
+               verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L);
 
                emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L);
 
-               final Thread syncSavepointThread = 
triggerSynchronousSavepointFromDifferentThread(srcTask, expectMaxWatermark, 
syncSavepointId);
+               srcTask.triggerCheckpoint(
+                               new CheckpointMetaData(syncSavepointId, 900),
+                               new 
CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, 
CheckpointStorageLocationReference.getDefault()),
+                               withMaxWatermark);
 
-               final SynchronousSavepointLatch syncSavepointFuture = 
waitForSyncSavepointFutureToBeSet(srcTask);
+               assertTrue(syncSavepointLatch.isSet());
+               assertFalse(syncSavepointLatch.isCompleted());
+               assertFalse(syncSavepointLatch.isWaiting());
 
-               if (expectMaxWatermark) {
+               if (withMaxWatermark) {
                        // if we are in TERMINATE mode, we expect the source 
task
                        // to emit MAX_WM before the SYNC_SAVEPOINT barrier.
                        verifyWatermark(srcTaskTestHarness.getOutput(), 
Watermark.MAX_WATERMARK);
@@ -112,54 +119,12 @@ public class SourceTaskTerminationTest {
 
                verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 
syncSavepointId);
 
-               assertFalse(syncSavepointFuture.isCompleted());
-               assertTrue(syncSavepointFuture.isWaiting());
-
                srcTask.notifyCheckpointComplete(syncSavepointId);
-               assertTrue(syncSavepointFuture.isCompleted());
+               assertTrue(syncSavepointLatch.isCompleted());
 
-               syncSavepointThread.join();
                executionThread.join();
        }
 
-       private void validateNoExceptionsWereThrown() {
-               if (error.get() != null && !(error.get() instanceof 
CancelTaskException)) {
-                       fail(error.get().getMessage());
-               }
-       }
-
-       private Thread triggerSynchronousSavepointFromDifferentThread(
-                       final StreamTask<Long, ?> task,
-                       final boolean advanceToEndOfEventTime,
-                       final long syncSavepointId) {
-               final Thread checkpointingThread = new Thread(() -> {
-                       try {
-                               task.triggerCheckpoint(
-                                               new 
CheckpointMetaData(syncSavepointId, 900),
-                                               new 
CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, 
CheckpointStorageLocationReference.getDefault()),
-                                               advanceToEndOfEventTime);
-                       } catch (Exception e) {
-                               error.set(e);
-                       }
-               });
-               checkpointingThread.start();
-
-               return checkpointingThread;
-
-       }
-
-       private void emitAndVerifyCheckpoint(
-                       final StreamTaskTestHarness<Long> srcTaskTestHarness,
-                       final StreamTask<Long, ?> srcTask,
-                       final long checkpointId) throws Exception {
-
-               srcTask.triggerCheckpoint(
-                               new CheckpointMetaData(checkpointId, 900),
-                               
CheckpointOptions.forCheckpointWithDefaultLocation(),
-                               false);
-               verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 
checkpointId);
-       }
-
        private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() {
                final StreamTaskTestHarness<Long> testHarness = new 
StreamTaskTestHarness<>(
                                SourceStreamTask::new,
@@ -177,16 +142,6 @@ public class SourceTaskTerminationTest {
                return testHarness;
        }
 
-       private SynchronousSavepointLatch 
waitForSyncSavepointFutureToBeSet(final StreamTask streamTaskUnderTest) throws 
InterruptedException {
-               final SynchronousSavepointLatch syncSavepointFuture = 
streamTaskUnderTest.getSynchronousSavepointLatch();
-               while (!syncSavepointFuture.isWaiting()) {
-                       Thread.sleep(10L);
-
-                       validateNoExceptionsWereThrown();
-               }
-               return syncSavepointFuture;
-       }
-
        private void emitAndVerifyWatermarkAndElement(
                        final StreamTaskTestHarness<Long> srcTaskTestHarness,
                        final long expectedElement) throws InterruptedException 
{
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
index bac9d43..3b4aee3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
@@ -68,12 +67,10 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -88,8 +85,6 @@ import static org.mockito.Mockito.when;
  */
 public class SynchronousCheckpointITCase {
 
-       private static OneShotLatch checkpointTriggered = new OneShotLatch();
-
        // A thread-safe queue to "log" and monitor events happening in the 
task's methods. Also, used by the test thread
        // to synchronize actions with the task's threads.
        private static LinkedBlockingQueue<Event> eventQueue = new 
LinkedBlockingQueue<>();
@@ -98,7 +93,7 @@ public class SynchronousCheckpointITCase {
        public final Timeout timeoutPerTest = Timeout.seconds(10);
 
        @Test
-       public void taskCachedThreadPoolAllowsForSynchronousCheckpoints() 
throws Exception {
+       public void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() 
throws Exception {
                final Task task = 
createTask(SynchronousCheckpointTestingTask.class);
 
                try (TaskCleaner ignored = new TaskCleaner(task)) {
@@ -109,12 +104,6 @@ public class SynchronousCheckpointITCase {
 
                        assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
 
-                       // Hack: we are triggering a checkpoint with 
advanceToEndOfEventTime = true, to be sure that
-                       // triggerCheckpointBarrier has reached the sync 
checkpoint latch (by verifying in
-                       // 
SynchronousCheckpointTestingTask.advanceToEndOfEventTime) and only then 
proceeding to
-                       // notifyCheckpointComplete.
-                       // Without such synchronization, the 
notifyCheckpointComplete execution may be executed first and leave this
-                       // test in a deadlock.
                        task.triggerCheckpointBarrier(
                                        42,
                                        156865867234L,
@@ -122,16 +111,13 @@ public class SynchronousCheckpointITCase {
                                        true);
 
                        assertThat(eventQueue.take(), 
is(Event.PRE_TRIGGER_CHECKPOINT));
+                       assertThat(eventQueue.take(), 
is(Event.POST_TRIGGER_CHECKPOINT));
                        assertTrue(eventQueue.isEmpty());
 
-                       checkpointTriggered.await();
-
                        task.notifyCheckpointComplete(42);
 
                        assertThat(eventQueue.take(), 
is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE));
-                       assertThat(
-                               Arrays.asList(eventQueue.take(), 
eventQueue.take()),
-                               
containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, 
Event.POST_TRIGGER_CHECKPOINT));
+                       assertThat(eventQueue.take(), 
is(Event.POST_NOTIFY_CHECKPOINT_COMPLETE));
                        assertTrue(eventQueue.isEmpty());
 
                        assertEquals(ExecutionState.RUNNING, 
task.getExecutionState());
@@ -196,13 +182,6 @@ public class SynchronousCheckpointITCase {
                protected void cleanup() {
 
                }
-
-               @Override
-               protected void advanceToEndOfEventTime() throws Exception {
-                       // Wake up the test thread that we have actually 
entered the checkpoint invocation and the sync checkpoint
-                       // latch is set.
-                       checkpointTriggered.trigger();
-               }
        }
 
        /**
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
index fa2f9b7..59f5a81 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.util.function.RunnableWithException;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -53,120 +56,127 @@ public class SynchronousSavepointSyncLatchTest {
        }
 
        @Test
-       public void waitAndThenTriggerWorks() throws Exception {
+       public void triggerUnblocksWait() throws Exception {
                final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
-               final WaitingOnLatchCallable callable = new 
WaitingOnLatchCallable(latchUnderTest, 1L);
 
-               executors.submit(callable);
+               latchUnderTest.setCheckpointId(1L);
+               assertFalse(latchUnderTest.isWaiting());
 
-               while (!latchUnderTest.isSet()) {
+               Future<Void> future = 
runThreadWaitingForCheckpointAck(latchUnderTest);
+               while (!latchUnderTest.isWaiting()) {
                        Thread.sleep(5L);
                }
 
+               final AtomicBoolean triggered = new AtomicBoolean();
+
                // wrong checkpoint id.
-               latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
+               latchUnderTest.acknowledgeCheckpointAndTrigger(2L, () -> 
triggered.set(true));
+               assertFalse(triggered.get());
+               assertFalse(latchUnderTest.isCompleted());
                assertTrue(latchUnderTest.isWaiting());
 
-               latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+               latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> 
triggered.set(true));
+               assertTrue(triggered.get());
                assertTrue(latchUnderTest.isCompleted());
+
+               future.get();
+               assertFalse(latchUnderTest.isWaiting());
        }
 
        @Test
-       public void waitAndThenCancelWorks() throws Exception {
+       public void cancelUnblocksWait() throws Exception {
                final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
-               final WaitingOnLatchCallable callable = new 
WaitingOnLatchCallable(latchUnderTest, 1L);
 
-               final Future<Boolean> resultFuture = executors.submit(callable);
+               latchUnderTest.setCheckpointId(1L);
+               assertFalse(latchUnderTest.isWaiting());
 
-               while (!latchUnderTest.isSet()) {
+               Future<Void> future = 
runThreadWaitingForCheckpointAck(latchUnderTest);
+               while (!latchUnderTest.isWaiting()) {
                        Thread.sleep(5L);
                }
 
                latchUnderTest.cancelCheckpointLatch();
-
-               boolean result = resultFuture.get();
-
-               assertFalse(result);
                assertTrue(latchUnderTest.isCanceled());
+
+               future.get();
+               assertFalse(latchUnderTest.isWaiting());
        }
 
        @Test
-       public void triggeringReturnsTrueAtMostOnce() throws Exception {
+       public void waitAfterTriggerIsNotBlocking() throws Exception {
                final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
 
-               final WaitingOnLatchCallable firstCallable = new 
WaitingOnLatchCallable(latchUnderTest, 1L);
-               final WaitingOnLatchCallable secondCallable = new 
WaitingOnLatchCallable(latchUnderTest, 1L);
-
-               final Future<Boolean> firstFuture = 
executors.submit(firstCallable);
-               final Future<Boolean> secondFuture = 
executors.submit(secondCallable);
+               latchUnderTest.setCheckpointId(1L);
+               latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> {});
 
-               while (!latchUnderTest.isSet()) {
-                       Thread.sleep(5L);
-               }
+               latchUnderTest.blockUntilCheckpointIsAcknowledged();
+       }
 
-               latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
+       @Test
+       public void waitAfterCancelIsNotBlocking() throws Exception {
+               final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
 
-               final boolean firstResult = firstFuture.get();
-               final boolean secondResult = secondFuture.get();
+               latchUnderTest.setCheckpointId(1L);
+               latchUnderTest.cancelCheckpointLatch();
+               assertTrue(latchUnderTest.isCanceled());
 
-               // only one of the two can be true (it is a race so we do not 
know which one)
-               assertTrue(firstResult ^ secondResult);
+               latchUnderTest.blockUntilCheckpointIsAcknowledged();
        }
 
        @Test
-       public void waitAfterTriggerReturnsTrueImmediately() throws Exception {
+       public void triggeringInvokesCallbackAtMostOnce() throws Exception {
                final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
+
                latchUnderTest.setCheckpointId(1L);
-               latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
-               final boolean triggerred = 
latchUnderTest.blockUntilCheckpointIsAcknowledged();
-               assertTrue(triggerred);
+
+               AtomicInteger counter = new AtomicInteger();
+               Future<Void> future1 = 
runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+               Future<Void> future2 = 
runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+               Future<Void> future3 = 
runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
+               future1.get();
+               future2.get();
+               future3.get();
+
+               assertEquals(1, counter.get());
        }
 
        @Test
-       public void waitAfterCancelDoesNothing() throws Exception {
+       public void triggeringAfterCancelDoesNotInvokeCallback() throws 
Exception {
                final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
+
                latchUnderTest.setCheckpointId(1L);
                latchUnderTest.cancelCheckpointLatch();
-               latchUnderTest.blockUntilCheckpointIsAcknowledged();
+               assertTrue(latchUnderTest.isCanceled());
+
+               final AtomicBoolean triggered = new AtomicBoolean();
+               latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> 
triggered.set(true));
+               assertFalse(triggered.get());
        }
 
        @Test
-       public void checkpointIdIsSetOnlyOnce() throws InterruptedException {
+       public void checkpointIdIsSetOnlyOnce() {
                final SynchronousSavepointLatch latchUnderTest = new 
SynchronousSavepointLatch();
 
-               final WaitingOnLatchCallable firstCallable = new 
WaitingOnLatchCallable(latchUnderTest, 1L);
-               executors.submit(firstCallable);
-
-               while (!latchUnderTest.isSet()) {
-                       Thread.sleep(5L);
-               }
-
-               final WaitingOnLatchCallable secondCallable = new 
WaitingOnLatchCallable(latchUnderTest, 2L);
-               executors.submit(secondCallable);
-
-               latchUnderTest.acknowledgeCheckpointAndTrigger(2L);
-               assertTrue(latchUnderTest.isWaiting());
+               latchUnderTest.setCheckpointId(1L);
+               assertTrue(latchUnderTest.isSet());
+               assertEquals(1L, latchUnderTest.getCheckpointId());
 
-               latchUnderTest.acknowledgeCheckpointAndTrigger(1L);
-               assertTrue(latchUnderTest.isCompleted());
+               latchUnderTest.setCheckpointId(2L);
+               assertTrue(latchUnderTest.isSet());
+               assertEquals(1L, latchUnderTest.getCheckpointId());
        }
 
-       private static final class WaitingOnLatchCallable implements 
Callable<Boolean> {
-
-               private final SynchronousSavepointLatch latch;
-               private final long checkpointId;
-
-               WaitingOnLatchCallable(
-                               final SynchronousSavepointLatch latch,
-                               final long checkpointId) {
-                       this.latch = checkNotNull(latch);
-                       this.checkpointId = checkpointId;
-               }
+       private Future<Void> 
runThreadWaitingForCheckpointAck(SynchronousSavepointLatch latch) {
+               return executors.submit(() -> {
+                       latch.blockUntilCheckpointIsAcknowledged();
+                       return null;
+               });
+       }
 
-               @Override
-               public Boolean call() throws Exception {
-                       latch.setCheckpointId(checkpointId);
-                       return latch.blockUntilCheckpointIsAcknowledged();
-               }
+       private Future<Void> 
runThreadTriggeringCheckpoint(SynchronousSavepointLatch latch, long 
checkpointId, RunnableWithException runnable) {
+               return executors.submit(() -> {
+                       latch.acknowledgeCheckpointAndTrigger(checkpointId, 
runnable);
+                       return null;
+               });
        }
 }

Reply via email to