Repository: beam Updated Branches: refs/heads/gearpump-runner 99221e739 -> 559e3c341
[BEAM-1779] Port UnboundedSourceWrapperTest to use Flink operator test harness Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c1dc8f53 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c1dc8f53 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c1dc8f53 Branch: refs/heads/gearpump-runner Commit: c1dc8f53c5438b575a7e84e9f680616ead49d61e Parents: 62b942a Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Mar 22 11:43:30 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Jun 7 19:43:11 2017 +0200 ---------------------------------------------------------------------- .../flink/streaming/TestCountingSource.java | 48 +++-- .../streaming/UnboundedSourceWrapperTest.java | 198 +++++++------------ 2 files changed, 110 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c1dc8f53/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java index 3a08088..edf548a 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java @@ -133,18 +133,8 @@ public class TestCountingSource public Coder<CounterMark> getCheckpointMarkCoder() { return DelegateCoder.of( VarIntCoder.of(), - new DelegateCoder.CodingFunction<CounterMark, Integer>() { - @Override - public Integer apply(CounterMark input) { - return input.current; - } - }, - new DelegateCoder.CodingFunction<Integer, CounterMark>() { - @Override - public CounterMark apply(Integer input) { - return new CounterMark(input); - } - }); + new FromCounterMark(), + new ToCounterMark()); } @Override @@ -251,4 +241,38 @@ public class TestCountingSource public Coder<KV<Integer, Integer>> getDefaultOutputCoder() { return KvCoder.of(VarIntCoder.of(), VarIntCoder.of()); } + + private class FromCounterMark implements DelegateCoder.CodingFunction<CounterMark, Integer> { + @Override + public Integer apply(CounterMark input) { + return input.current; + } + + @Override + public int hashCode() { + return FromCounterMark.class.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof FromCounterMark; + } + } + + private class ToCounterMark implements DelegateCoder.CodingFunction<Integer, CounterMark> { + @Override + public CounterMark apply(Integer input) { + return new CounterMark(input); + } + + @Override + public int hashCode() { + return ToCounterMark.class.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof ToCounterMark; + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/c1dc8f53/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index e3875bc..716e71d 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -20,36 +20,20 @@ package org.apache.beam.runners.flink.streaming; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.ValueWithRecordId; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; @@ -57,15 +41,14 @@ import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; import org.junit.Test; import org.junit.experimental.runners.Enclosed; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.Matchers; /** * Tests for {@link UnboundedSourceWrapper}. @@ -125,10 +108,18 @@ public class UnboundedSourceWrapperTest { KV<Integer, Integer>, TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(sourceOperator, numTasks); + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> + testHarness = + new AbstractStreamOperatorTestHarness<>( + sourceOperator, + numTasks /* max parallelism */, + numTasks /* parallelism */, + 0 /* subtask index */); + + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); try { - sourceOperator.open(); + testHarness.open(); sourceOperator.run(checkpointLock, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { @@ -200,29 +191,22 @@ public class UnboundedSourceWrapperTest { TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper); - OperatorStateStore backend = mock(OperatorStateStore.class); - - TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>> - listState = new TestingListState<>(); - - when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class))) - .thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true); + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> + testHarness = + new AbstractStreamOperatorTestHarness<>( + sourceOperator, + numTasks /* max parallelism */, + numTasks /* parallelism */, + 0 /* subtask index */); - flinkWrapper.initializeState(initializationContext); - - setupSourceOperator(sourceOperator, numTasks); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); final Set<KV<Integer, Integer>> emittedElements = new HashSet<>(); boolean readFirstBatchOfElements = false; try { - sourceOperator.open(); + testHarness.open(); sourceOperator.run(checkpointLock, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { @@ -265,21 +249,12 @@ public class UnboundedSourceWrapperTest { assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements); // draw a snapshot - flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0)); - - // test snapshot offsets - assertEquals(flinkWrapper.getLocalSplitSources().size(), - listState.getList().size()); - int totalEmit = 0; - for (KV<UnboundedSource, TestCountingSource.CounterMark> kv : listState.get()) { - totalEmit += kv.getValue().current + 1; - } - assertEquals(numElements / 2, totalEmit); + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); // test that finalizeCheckpoint on CheckpointMark is called final ArrayList<Integer> finalizeList = new ArrayList<>(); TestCountingSource.setFinalizeTracker(finalizeList); - flinkWrapper.notifyCheckpointComplete(0); + testHarness.notifyOfCompletedCheckpoint(0); assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size()); // create a completely new source but restore from the snapshot @@ -297,16 +272,25 @@ public class UnboundedSourceWrapperTest { TestCountingSource.CounterMark>> restoredSourceOperator = new StreamSource<>(restoredFlinkWrapper); - setupSourceOperator(restoredSourceOperator, numTasks); + // set parallelism to 1 to ensure that our testing operator gets all checkpointed state + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> + restoredTestHarness = + new AbstractStreamOperatorTestHarness<>( + restoredSourceOperator, + numTasks /* max parallelism */, + 1 /* parallelism */, + 0 /* subtask index */); + + restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); // restore snapshot - restoredFlinkWrapper.initializeState(initializationContext); + restoredTestHarness.initializeState(snapshot); boolean readSecondBatchOfElements = false; // run again and verify that we see the other elements try { - restoredSourceOperator.open(); + restoredTestHarness.open(); restoredSourceOperator.run(checkpointLock, new TestStreamStatusMaintainer(), new Output<StreamRecord<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>>>() { @@ -345,7 +329,9 @@ public class UnboundedSourceWrapperTest { readSecondBatchOfElements = true; } - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); + assertEquals( + Math.max(1, numSplits / numTasks), + restoredFlinkWrapper.getLocalSplitSources().size()); assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); @@ -364,68 +350,57 @@ public class UnboundedSourceWrapperTest { return null; } }; + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>("stepName", options, source, numSplits); - OperatorStateStore backend = mock(OperatorStateStore.class); - - TestingListState<KV<UnboundedSource, TestCountingSource.CounterMark>> - listState = new TestingListState<>(); - - when(backend.getOperatorState(Matchers.any(ListStateDescriptor.class))) - .thenReturn(listState); - - StateInitializationContext initializationContext = mock(StateInitializationContext.class); - - when(initializationContext.getOperatorStateStore()).thenReturn(backend); - when(initializationContext.isRestored()).thenReturn(false, true); + StreamSource< + WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>> + sourceOperator = new StreamSource<>(flinkWrapper); - flinkWrapper.initializeState(initializationContext); + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> + testHarness = + new AbstractStreamOperatorTestHarness<>( + sourceOperator, + numTasks /* max parallelism */, + numTasks /* parallelism */, + 0 /* subtask index */); - StreamSource sourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(sourceOperator, numTasks); - sourceOperator.open(); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); - flinkWrapper.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0)); + testHarness.open(); - assertEquals(0, listState.getList().size()); + OperatorStateHandles snapshot = testHarness.snapshot(0, 0); UnboundedSourceWrapper< KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>("stepName", options, new TestCountingSource(numElements), - numSplits); - - StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper); - setupSourceOperator(restoredSourceOperator, numTasks); - sourceOperator.open(); - - restoredFlinkWrapper.initializeState(initializationContext); - - assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size()); - - } + new UnboundedSourceWrapper<>( + "stepName", options, new TestCountingSource(numElements), numSplits); - @SuppressWarnings("unchecked") - private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) { - ExecutionConfig executionConfig = new ExecutionConfig(); - StreamConfig cfg = new StreamConfig(new Configuration()); + StreamSource< + WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>, + UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>> + restoredSourceOperator = + new StreamSource<>(restoredFlinkWrapper); - cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); + // set parallelism to 1 to ensure that our testing operator gets all checkpointed state + AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<KV<Integer, Integer>>>> + restoredTestHarness = + new AbstractStreamOperatorTestHarness<>( + restoredSourceOperator, + numTasks /* max parallelism */, + 1 /* parallelism */, + 0 /* subtask index */); - Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0); + restoredTestHarness.setup(); + restoredTestHarness.initializeState(snapshot); + restoredTestHarness.open(); - StreamTask<?, ?> mockTask = mock(StreamTask.class); - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(new Object()); - when(mockTask.getConfiguration()).thenReturn(cfg); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getAccumulatorMap()) - .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap()); - TestProcessingTimeService testProcessingTimeService = new TestProcessingTimeService(); - when(mockTask.getProcessingTimeService()).thenReturn(testProcessingTimeService); + // when the source checkpointed a null we don't re-initialize the splits, that is we + // will have no splits. + assertEquals(0, restoredFlinkWrapper.getLocalSplitSources().size()); - operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); } /** @@ -458,31 +433,6 @@ public class UnboundedSourceWrapperTest { } - private static final class TestingListState<T> implements ListState<T> { - - private final List<T> list = new ArrayList<>(); - - @Override - public void clear() { - list.clear(); - } - - @Override - public Iterable<T> get() throws Exception { - return list; - } - - @Override - public void add(T value) throws Exception { - list.add(value); - } - - public List<T> getList() { - return list; - } - - } - private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer { StreamStatus currentStreamStatus = StreamStatus.ACTIVE;