Repository: flink Updated Branches: refs/heads/release-1.4 38278ebef -> 09edf6a62
[FLINK-7949] Add unit test for AsyncWaitOperator recovery with full queue Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb088bc1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb088bc1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb088bc1 Branch: refs/heads/release-1.4 Commit: fb088bc1343099a1ea71d0589ab825897e8dcdee Parents: a2198b0 Author: Till Rohrmann <[email protected]> Authored: Wed Jan 10 18:53:38 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Thu Jan 25 15:26:09 2018 +0100 ---------------------------------------------------------------------- .../operators/async/AsyncWaitOperatorTest.java | 123 ++++++++++++++++++- 1 file changed, 122 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fb088bc1/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index 993bffb..34c9a0f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointOptions; @@ -56,6 +57,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.AcknowledgeStreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -65,6 +67,7 @@ import org.apache.flink.streaming.util.TestHarnessUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -74,10 +77,12 @@ import org.mockito.stubbing.Answer; import javax.annotation.Nonnull; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -88,6 +93,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; @@ -970,6 +976,122 @@ public class AsyncWaitOperatorTest extends TestLogger { } } + /** + * Tests that the AysncWaitOperator can restart if checkpointed queue was full. + * + * <p>See FLINK-7949 + */ + @Test(timeout = 10000) + public void testRestartWithFullQueue() throws Exception { + int capacity = 10; + + // 1. create the snapshot which contains capacity + 1 elements + final CompletableFuture<Void> trigger = new CompletableFuture<>(); + final ControllableAsyncFunction<Integer> controllableAsyncFunction = new ControllableAsyncFunction<>(trigger); + + final OneInputStreamOperatorTestHarness<Integer, Integer> snapshotHarness = new OneInputStreamOperatorTestHarness<>( + new AsyncWaitOperator<>( + controllableAsyncFunction, // the NoOpAsyncFunction is like a blocking function + 1000L, + capacity, + AsyncDataStream.OutputMode.ORDERED), + IntSerializer.INSTANCE); + + snapshotHarness.open(); + + final OperatorStateHandles snapshot; + + final ArrayList<Integer> expectedOutput = new ArrayList<>(capacity + 1); + + try { + synchronized (snapshotHarness.getCheckpointLock()) { + for (int i = 0; i < capacity; i++) { + snapshotHarness.processElement(i, 0L); + expectedOutput.add(i); + } + } + + expectedOutput.add(capacity); + + final OneShotLatch lastElement = new OneShotLatch(); + + final CheckedThread lastElementWriter = new CheckedThread() { + @Override + public void go() throws Exception { + synchronized (snapshotHarness.getCheckpointLock()) { + lastElement.trigger(); + snapshotHarness.processElement(capacity, 0L); + } + } + }; + + lastElementWriter.start(); + + lastElement.await(); + + synchronized (snapshotHarness.getCheckpointLock()) { + // execute the snapshot within the checkpoint lock, because then it is guaranteed + // that the lastElementWriter has written the exceeding element + snapshot = snapshotHarness.snapshot(0L, 0L); + } + + // trigger the computation to make the close call finish + trigger.complete(null); + } finally { + synchronized (snapshotHarness.getCheckpointLock()) { + snapshotHarness.close(); + } + } + + // 2. restore the snapshot and check that we complete + final OneInputStreamOperatorTestHarness<Integer, Integer> recoverHarness = new OneInputStreamOperatorTestHarness<>( + new AsyncWaitOperator<>( + new ControllableAsyncFunction<>(CompletableFuture.completedFuture(null)), + 1000L, + capacity, + AsyncDataStream.OutputMode.ORDERED), + IntSerializer.INSTANCE); + + recoverHarness.initializeState(snapshot); + + synchronized (recoverHarness.getCheckpointLock()) { + recoverHarness.open(); + } + + synchronized (recoverHarness.getCheckpointLock()) { + recoverHarness.close(); + } + + final ConcurrentLinkedQueue<Object> output = recoverHarness.getOutput(); + + assertThat(output.size(), Matchers.equalTo(capacity + 1)); + + final ArrayList<Integer> outputElements = new ArrayList<>(capacity + 1); + + for (int i = 0; i < capacity + 1; i++) { + StreamRecord<Integer> streamRecord = ((StreamRecord<Integer>) output.poll()); + outputElements.add(streamRecord.getValue()); + } + + assertThat(outputElements, Matchers.equalTo(expectedOutput)); + } + + private static class ControllableAsyncFunction<IN> implements AsyncFunction<IN, IN> { + + private static final long serialVersionUID = -4214078239267288636L; + + private transient CompletableFuture<Void> trigger; + + private ControllableAsyncFunction(CompletableFuture<Void> trigger) { + this.trigger = Preconditions.checkNotNull(trigger); + } + + @Override + public void asyncInvoke(IN input, ResultFuture<IN> resultFuture) throws Exception { + trigger.thenAccept(v -> resultFuture.complete(Collections.singleton(input))); + } + } + private static class NoOpAsyncFunction<IN, OUT> implements AsyncFunction<IN, OUT> { private static final long serialVersionUID = -3060481953330480694L; @@ -978,5 +1100,4 @@ public class AsyncWaitOperatorTest extends TestLogger { // no op } } - }
