This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 67184c27ca46b3562f0a9651663a091a17dadbbc Author: Aleksey Pak <alek...@ververica.com> AuthorDate: Mon Jul 8 17:11:46 2019 +0200 [FLINK-13205][runtime] Make stop-with-savepoint non-blocking on SourceStreamTask checkpoint injecting thread --- .../flink/streaming/runtime/tasks/StreamTask.java | 29 ++--- .../runtime/tasks/SynchronousSavepointLatch.java | 55 ++++---- .../runtime/tasks/SourceTaskTerminationTest.java | 103 +++++---------- .../runtime/tasks/SynchronousCheckpointITCase.java | 27 +--- .../tasks/SynchronousSavepointSyncLatchTest.java | 144 +++++++++++---------- 5 files changed, 148 insertions(+), 210 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 2f4dd6a..f9fe110 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -702,7 +702,11 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> CheckpointMetrics checkpointMetrics) throws Exception { try { - performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false); + if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false)) { + if (syncSavepointLatch.isSet()) { + syncSavepointLatch.blockUntilCheckpointIsAcknowledged(); + } + } } catch (CancelTaskException e) { LOG.info("Operator {} was cancelled while performing checkpoint {}.", @@ -739,7 +743,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> final long checkpointId = checkpointMetaData.getCheckpointId(); - final boolean result; synchronized (lock) { if (isRunning) { @@ -770,7 +773,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // impact progress of the streaming topology checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); - result = true; + return true; } else { // we cannot perform our checkpoint - let the downstream operators know that they @@ -795,21 +798,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> throw exception; } - result = false; - } - } - - if (isRunning && syncSavepointLatch.isSet()) { - - final boolean checkpointWasAcked = - syncSavepointLatch.blockUntilCheckpointIsAcknowledged(); - - if (checkpointWasAcked) { - finishTask(); + return false; } } - - return result; } public ExecutorService getAsyncOperationsThreadPool() { @@ -818,6 +809,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { + boolean success = false; synchronized (lock) { if (isRunning) { LOG.debug("Notification of complete checkpoint for task {}", getName()); @@ -828,12 +820,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId); + success = true; } else { LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName()); } } + if (success) { + syncSavepointLatch.acknowledgeCheckpointAndTrigger(checkpointId, this::finishTask); + } } private void tryShutdownTimerService() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java index 0a67dac..d10c19c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointLatch.java @@ -20,6 +20,10 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.checkpoint.CheckpointType; +import org.apache.flink.util.function.RunnableWithException; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** * A synchronization primitive used by the {@link StreamTask} to wait @@ -29,12 +33,17 @@ class SynchronousSavepointLatch { private static final long NOT_SET_CHECKPOINT_ID = -1L; - // these are mutually exclusive + enum CompletionResult { + COMPLETED, + CANCELED, + } + + @GuardedBy("synchronizationPoint") private volatile boolean waiting; - private volatile boolean completed; - private volatile boolean canceled; - private volatile boolean wasAlreadyCompleted; + @GuardedBy("synchronizationPoint") + @Nullable + private volatile CompletionResult completionResult; private final Object synchronizationPoint; @@ -44,8 +53,6 @@ class SynchronousSavepointLatch { this.synchronizationPoint = new Object(); this.waiting = false; - this.completed = false; - this.canceled = false; this.checkpointId = NOT_SET_CHECKPOINT_ID; } @@ -59,45 +66,38 @@ class SynchronousSavepointLatch { } } - boolean blockUntilCheckpointIsAcknowledged() throws Exception { + void blockUntilCheckpointIsAcknowledged() throws Exception { synchronized (synchronizationPoint) { - if (isSet() && !isDone()) { + if (completionResult == null && isSet()) { waiting = true; synchronizationPoint.wait(); waiting = false; } - - if (!isCanceled() && !wasAlreadyCompleted) { - wasAlreadyCompleted = true; - return true; - } - - return false; } } - void acknowledgeCheckpointAndTrigger(final long checkpointId) { + void acknowledgeCheckpointAndTrigger(final long checkpointId, RunnableWithException runnable) throws Exception { synchronized (synchronizationPoint) { - if (isSet() && !isDone() && this.checkpointId == checkpointId) { - completed = true; - synchronizationPoint.notifyAll(); + if (completionResult == null && this.checkpointId == checkpointId) { + completionResult = CompletionResult.COMPLETED; + try { + runnable.run(); + } finally { + synchronizationPoint.notifyAll(); + } } } } void cancelCheckpointLatch() { synchronized (synchronizationPoint) { - if (!isDone()) { - canceled = true; + if (completionResult == null) { + completionResult = CompletionResult.CANCELED; synchronizationPoint.notifyAll(); } } } - private boolean isDone () { - return canceled || completed; - } - @VisibleForTesting boolean isWaiting() { return waiting; @@ -105,15 +105,14 @@ class SynchronousSavepointLatch { @VisibleForTesting boolean isCompleted() { - return completed; + return completionResult == CompletionResult.COMPLETED; } @VisibleForTesting boolean isCanceled() { - return canceled; + return completionResult == CompletionResult.CANCELED; } - @VisibleForTesting boolean isSet() { return checkpointId != NOT_SET_CHECKPOINT_ID; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java index 9295788..4a613d8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceTaskTerminationTest.java @@ -24,7 +24,6 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; -import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; @@ -33,62 +32,56 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.TestLogger; -import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * A test verifying the termination process * (synchronous checkpoint and task termination) at the {@link SourceStreamTask}. */ -public class SourceTaskTerminationTest { +public class SourceTaskTerminationTest extends TestLogger { private static OneShotLatch ready; private static MultiShotLatch runLoopStart; private static MultiShotLatch runLoopEnd; - private static AtomicReference<Throwable> error; + @Rule + public final Timeout timeoutPerTest = Timeout.seconds(20); @Before public void initialize() { ready = new OneShotLatch(); runLoopStart = new MultiShotLatch(); runLoopEnd = new MultiShotLatch(); - error = new AtomicReference<>(); - - error.set(null); - } - - @After - public void validate() { - validateNoExceptionsWereThrown(); } @Test - public void terminateShouldBlockDuringCheckpointingAndEmitMaxWatermark() throws Exception { + public void testStopWithSavepointWithMaxWatermark() throws Exception { stopWithSavepointStreamTaskTestHelper(true); } @Test - public void suspendShouldBlockDuringCheckpointingAndNotEmitMaxWatermark() throws Exception { + public void testStopWithSavepointWithoutMaxWatermark() throws Exception { stopWithSavepointStreamTaskTestHelper(false); } - private void stopWithSavepointStreamTaskTestHelper(final boolean expectMaxWatermark) throws Exception { + private void stopWithSavepointStreamTaskTestHelper(final boolean withMaxWatermark) throws Exception { final long syncSavepointId = 34L; final StreamTaskTestHarness<Long> srcTaskTestHarness = getSourceStreamTaskTestHarness(); final Thread executionThread = srcTaskTestHarness.invoke(); final StreamTask<Long, ?> srcTask = srcTaskTestHarness.getTask(); + final SynchronousSavepointLatch syncSavepointLatch = srcTask.getSynchronousSavepointLatch(); ready.await(); @@ -96,15 +89,29 @@ public class SourceTaskTerminationTest { emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 1L); emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 2L); - emitAndVerifyCheckpoint(srcTaskTestHarness, srcTask, 31L); + srcTask.triggerCheckpoint( + new CheckpointMetaData(31L, 900), + CheckpointOptions.forCheckpointWithDefaultLocation(), + false); + + assertFalse(syncSavepointLatch.isSet()); + assertFalse(syncSavepointLatch.isCompleted()); + assertFalse(syncSavepointLatch.isWaiting()); + + verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), 31L); emitAndVerifyWatermarkAndElement(srcTaskTestHarness, 3L); - final Thread syncSavepointThread = triggerSynchronousSavepointFromDifferentThread(srcTask, expectMaxWatermark, syncSavepointId); + srcTask.triggerCheckpoint( + new CheckpointMetaData(syncSavepointId, 900), + new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), + withMaxWatermark); - final SynchronousSavepointLatch syncSavepointFuture = waitForSyncSavepointFutureToBeSet(srcTask); + assertTrue(syncSavepointLatch.isSet()); + assertFalse(syncSavepointLatch.isCompleted()); + assertFalse(syncSavepointLatch.isWaiting()); - if (expectMaxWatermark) { + if (withMaxWatermark) { // if we are in TERMINATE mode, we expect the source task // to emit MAX_WM before the SYNC_SAVEPOINT barrier. verifyWatermark(srcTaskTestHarness.getOutput(), Watermark.MAX_WATERMARK); @@ -112,54 +119,12 @@ public class SourceTaskTerminationTest { verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), syncSavepointId); - assertFalse(syncSavepointFuture.isCompleted()); - assertTrue(syncSavepointFuture.isWaiting()); - srcTask.notifyCheckpointComplete(syncSavepointId); - assertTrue(syncSavepointFuture.isCompleted()); + assertTrue(syncSavepointLatch.isCompleted()); - syncSavepointThread.join(); executionThread.join(); } - private void validateNoExceptionsWereThrown() { - if (error.get() != null && !(error.get() instanceof CancelTaskException)) { - fail(error.get().getMessage()); - } - } - - private Thread triggerSynchronousSavepointFromDifferentThread( - final StreamTask<Long, ?> task, - final boolean advanceToEndOfEventTime, - final long syncSavepointId) { - final Thread checkpointingThread = new Thread(() -> { - try { - task.triggerCheckpoint( - new CheckpointMetaData(syncSavepointId, 900), - new CheckpointOptions(CheckpointType.SYNC_SAVEPOINT, CheckpointStorageLocationReference.getDefault()), - advanceToEndOfEventTime); - } catch (Exception e) { - error.set(e); - } - }); - checkpointingThread.start(); - - return checkpointingThread; - - } - - private void emitAndVerifyCheckpoint( - final StreamTaskTestHarness<Long> srcTaskTestHarness, - final StreamTask<Long, ?> srcTask, - final long checkpointId) throws Exception { - - srcTask.triggerCheckpoint( - new CheckpointMetaData(checkpointId, 900), - CheckpointOptions.forCheckpointWithDefaultLocation(), - false); - verifyCheckpointBarrier(srcTaskTestHarness.getOutput(), checkpointId); - } - private StreamTaskTestHarness<Long> getSourceStreamTaskTestHarness() { final StreamTaskTestHarness<Long> testHarness = new StreamTaskTestHarness<>( SourceStreamTask::new, @@ -177,16 +142,6 @@ public class SourceTaskTerminationTest { return testHarness; } - private SynchronousSavepointLatch waitForSyncSavepointFutureToBeSet(final StreamTask streamTaskUnderTest) throws InterruptedException { - final SynchronousSavepointLatch syncSavepointFuture = streamTaskUnderTest.getSynchronousSavepointLatch(); - while (!syncSavepointFuture.isWaiting()) { - Thread.sleep(10L); - - validateNoExceptionsWereThrown(); - } - return syncSavepointFuture; - } - private void emitAndVerifyWatermarkAndElement( final StreamTaskTestHarness<Long> srcTaskTestHarness, final long expectedElement) throws InterruptedException { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java index bac9d43..3b4aee3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousCheckpointITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.blob.BlobCacheService; import org.apache.flink.runtime.blob.PermanentBlobCache; import org.apache.flink.runtime.blob.TransientBlobCache; @@ -68,12 +67,10 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; -import java.util.Arrays; import java.util.Collections; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -88,8 +85,6 @@ import static org.mockito.Mockito.when; */ public class SynchronousCheckpointITCase { - private static OneShotLatch checkpointTriggered = new OneShotLatch(); - // A thread-safe queue to "log" and monitor events happening in the task's methods. Also, used by the test thread // to synchronize actions with the task's threads. private static LinkedBlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>(); @@ -98,7 +93,7 @@ public class SynchronousCheckpointITCase { public final Timeout timeoutPerTest = Timeout.seconds(10); @Test - public void taskCachedThreadPoolAllowsForSynchronousCheckpoints() throws Exception { + public void taskDispatcherThreadPoolAllowsForSynchronousCheckpoints() throws Exception { final Task task = createTask(SynchronousCheckpointTestingTask.class); try (TaskCleaner ignored = new TaskCleaner(task)) { @@ -109,12 +104,6 @@ public class SynchronousCheckpointITCase { assertEquals(ExecutionState.RUNNING, task.getExecutionState()); - // Hack: we are triggering a checkpoint with advanceToEndOfEventTime = true, to be sure that - // triggerCheckpointBarrier has reached the sync checkpoint latch (by verifying in - // SynchronousCheckpointTestingTask.advanceToEndOfEventTime) and only then proceeding to - // notifyCheckpointComplete. - // Without such synchronization, the notifyCheckpointComplete execution may be executed first and leave this - // test in a deadlock. task.triggerCheckpointBarrier( 42, 156865867234L, @@ -122,16 +111,13 @@ public class SynchronousCheckpointITCase { true); assertThat(eventQueue.take(), is(Event.PRE_TRIGGER_CHECKPOINT)); + assertThat(eventQueue.take(), is(Event.POST_TRIGGER_CHECKPOINT)); assertTrue(eventQueue.isEmpty()); - checkpointTriggered.await(); - task.notifyCheckpointComplete(42); assertThat(eventQueue.take(), is(Event.PRE_NOTIFY_CHECKPOINT_COMPLETE)); - assertThat( - Arrays.asList(eventQueue.take(), eventQueue.take()), - containsInAnyOrder(Event.POST_NOTIFY_CHECKPOINT_COMPLETE, Event.POST_TRIGGER_CHECKPOINT)); + assertThat(eventQueue.take(), is(Event.POST_NOTIFY_CHECKPOINT_COMPLETE)); assertTrue(eventQueue.isEmpty()); assertEquals(ExecutionState.RUNNING, task.getExecutionState()); @@ -196,13 +182,6 @@ public class SynchronousCheckpointITCase { protected void cleanup() { } - - @Override - protected void advanceToEndOfEventTime() throws Exception { - // Wake up the test thread that we have actually entered the checkpoint invocation and the sync checkpoint - // latch is set. - checkpointTriggered.trigger(); - } } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java index fa2f9b7..59f5a81 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.java @@ -18,17 +18,20 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.util.function.RunnableWithException; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -53,120 +56,127 @@ public class SynchronousSavepointSyncLatchTest { } @Test - public void waitAndThenTriggerWorks() throws Exception { + public void triggerUnblocksWait() throws Exception { final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); - final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L); - executors.submit(callable); + latchUnderTest.setCheckpointId(1L); + assertFalse(latchUnderTest.isWaiting()); - while (!latchUnderTest.isSet()) { + Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest); + while (!latchUnderTest.isWaiting()) { Thread.sleep(5L); } + final AtomicBoolean triggered = new AtomicBoolean(); + // wrong checkpoint id. - latchUnderTest.acknowledgeCheckpointAndTrigger(2L); + latchUnderTest.acknowledgeCheckpointAndTrigger(2L, () -> triggered.set(true)); + assertFalse(triggered.get()); + assertFalse(latchUnderTest.isCompleted()); assertTrue(latchUnderTest.isWaiting()); - latchUnderTest.acknowledgeCheckpointAndTrigger(1L); + latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true)); + assertTrue(triggered.get()); assertTrue(latchUnderTest.isCompleted()); + + future.get(); + assertFalse(latchUnderTest.isWaiting()); } @Test - public void waitAndThenCancelWorks() throws Exception { + public void cancelUnblocksWait() throws Exception { final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); - final WaitingOnLatchCallable callable = new WaitingOnLatchCallable(latchUnderTest, 1L); - final Future<Boolean> resultFuture = executors.submit(callable); + latchUnderTest.setCheckpointId(1L); + assertFalse(latchUnderTest.isWaiting()); - while (!latchUnderTest.isSet()) { + Future<Void> future = runThreadWaitingForCheckpointAck(latchUnderTest); + while (!latchUnderTest.isWaiting()) { Thread.sleep(5L); } latchUnderTest.cancelCheckpointLatch(); - - boolean result = resultFuture.get(); - - assertFalse(result); assertTrue(latchUnderTest.isCanceled()); + + future.get(); + assertFalse(latchUnderTest.isWaiting()); } @Test - public void triggeringReturnsTrueAtMostOnce() throws Exception { + public void waitAfterTriggerIsNotBlocking() throws Exception { final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); - final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L); - final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 1L); - - final Future<Boolean> firstFuture = executors.submit(firstCallable); - final Future<Boolean> secondFuture = executors.submit(secondCallable); + latchUnderTest.setCheckpointId(1L); + latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> {}); - while (!latchUnderTest.isSet()) { - Thread.sleep(5L); - } + latchUnderTest.blockUntilCheckpointIsAcknowledged(); + } - latchUnderTest.acknowledgeCheckpointAndTrigger(1L); + @Test + public void waitAfterCancelIsNotBlocking() throws Exception { + final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); - final boolean firstResult = firstFuture.get(); - final boolean secondResult = secondFuture.get(); + latchUnderTest.setCheckpointId(1L); + latchUnderTest.cancelCheckpointLatch(); + assertTrue(latchUnderTest.isCanceled()); - // only one of the two can be true (it is a race so we do not know which one) - assertTrue(firstResult ^ secondResult); + latchUnderTest.blockUntilCheckpointIsAcknowledged(); } @Test - public void waitAfterTriggerReturnsTrueImmediately() throws Exception { + public void triggeringInvokesCallbackAtMostOnce() throws Exception { final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); + latchUnderTest.setCheckpointId(1L); - latchUnderTest.acknowledgeCheckpointAndTrigger(1L); - final boolean triggerred = latchUnderTest.blockUntilCheckpointIsAcknowledged(); - assertTrue(triggerred); + + AtomicInteger counter = new AtomicInteger(); + Future<Void> future1 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet); + Future<Void> future2 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet); + Future<Void> future3 = runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet); + future1.get(); + future2.get(); + future3.get(); + + assertEquals(1, counter.get()); } @Test - public void waitAfterCancelDoesNothing() throws Exception { + public void triggeringAfterCancelDoesNotInvokeCallback() throws Exception { final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); + latchUnderTest.setCheckpointId(1L); latchUnderTest.cancelCheckpointLatch(); - latchUnderTest.blockUntilCheckpointIsAcknowledged(); + assertTrue(latchUnderTest.isCanceled()); + + final AtomicBoolean triggered = new AtomicBoolean(); + latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true)); + assertFalse(triggered.get()); } @Test - public void checkpointIdIsSetOnlyOnce() throws InterruptedException { + public void checkpointIdIsSetOnlyOnce() { final SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch(); - final WaitingOnLatchCallable firstCallable = new WaitingOnLatchCallable(latchUnderTest, 1L); - executors.submit(firstCallable); - - while (!latchUnderTest.isSet()) { - Thread.sleep(5L); - } - - final WaitingOnLatchCallable secondCallable = new WaitingOnLatchCallable(latchUnderTest, 2L); - executors.submit(secondCallable); - - latchUnderTest.acknowledgeCheckpointAndTrigger(2L); - assertTrue(latchUnderTest.isWaiting()); + latchUnderTest.setCheckpointId(1L); + assertTrue(latchUnderTest.isSet()); + assertEquals(1L, latchUnderTest.getCheckpointId()); - latchUnderTest.acknowledgeCheckpointAndTrigger(1L); - assertTrue(latchUnderTest.isCompleted()); + latchUnderTest.setCheckpointId(2L); + assertTrue(latchUnderTest.isSet()); + assertEquals(1L, latchUnderTest.getCheckpointId()); } - private static final class WaitingOnLatchCallable implements Callable<Boolean> { - - private final SynchronousSavepointLatch latch; - private final long checkpointId; - - WaitingOnLatchCallable( - final SynchronousSavepointLatch latch, - final long checkpointId) { - this.latch = checkNotNull(latch); - this.checkpointId = checkpointId; - } + private Future<Void> runThreadWaitingForCheckpointAck(SynchronousSavepointLatch latch) { + return executors.submit(() -> { + latch.blockUntilCheckpointIsAcknowledged(); + return null; + }); + } - @Override - public Boolean call() throws Exception { - latch.setCheckpointId(checkpointId); - return latch.blockUntilCheckpointIsAcknowledged(); - } + private Future<Void> runThreadTriggeringCheckpoint(SynchronousSavepointLatch latch, long checkpointId, RunnableWithException runnable) { + return executors.submit(() -> { + latch.acknowledgeCheckpointAndTrigger(checkpointId, runnable); + return null; + }); } }