Add simple tests for stateful ParDo
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e158e4e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e158e4e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e158e4e Branch: refs/heads/master Commit: 7e158e4e583372dd79ffaa380ac7c2dbb4846c50 Parents: e17dc4a Author: Kenneth Knowles <[email protected]> Authored: Mon Nov 21 15:41:27 2016 -0800 Committer: Kenneth Knowles <[email protected]> Committed: Mon Nov 28 11:43:21 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/ParDoTest.java | 106 ++++++++++++++++++- 1 file changed, 102 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e158e4e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index be1eaa4..593f304 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -55,6 +55,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesStatefulParDo; import org.apache.beam.sdk.transforms.DoFn.OnTimer; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; import org.apache.beam.sdk.transforms.ParDo.Bound; @@ -1464,8 +1465,8 @@ public class ParDoTest implements Serializable { } @Test - @Category(RunnableOnService.class) - public void testValueState() { + @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testValueStateSimple() { final String stateId = "foo"; DoFn<KV<String, Integer>, Integer> fn = @@ -1494,8 +1495,59 @@ public class ParDoTest implements Serializable { } @Test - @Category(RunnableOnService.class) - public void testBagSTate() { + @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testValueStateSideOutput() { + final String stateId = "foo"; + + final TupleTag<Integer> evenTag = new TupleTag<Integer>() {}; + final TupleTag<Integer> oddTag = new TupleTag<Integer>() {}; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @StateId(stateId) + private final StateSpec<Object, ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + if (currentValue % 2 == 0) { + c.output(currentValue); + } else { + c.sideOutput(oddTag, currentValue); + } + state.write(currentValue + 1); + } + }; + + Pipeline p = TestPipeline.create(); + PCollectionTuple output = + p.apply( + Create.of( + KV.of("hello", 42), + KV.of("hello", 97), + KV.of("hello", 84), + KV.of("goodbye", 33), + KV.of("hello", 859), + KV.of("goodbye", 83945))) + .apply(ParDo.of(fn).withOutputTags(evenTag, TupleTagList.of(oddTag))); + + PCollection<Integer> evens = output.get(evenTag); + PCollection<Integer> odds = output.get(oddTag); + + // There are 0 and 2 from "hello" and just 0 from "goodbye" + PAssert.that(evens).containsInAnyOrder(0, 2, 0); + + // There are 1 and 3 from "hello" and just "1" from "goodbye" + PAssert.that(odds).containsInAnyOrder(1, 3, 1); + p.run(); + } + + @Test + @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testBagState() { final String stateId = "foo"; DoFn<KV<String, Integer>, List<Integer>> fn = @@ -1530,6 +1582,52 @@ public class ParDoTest implements Serializable { } @Test + @Category({RunnableOnService.class, UsesStatefulParDo.class}) + public void testBagStateSideInput() { + Pipeline p = TestPipeline.create(); + + final PCollectionView<List<Integer>> listView = + p.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList()); + + final String stateId = "foo"; + DoFn<KV<String, Integer>, List<Integer>> fn = + new DoFn<KV<String, Integer>, List<Integer>>() { + + @StateId(stateId) + private final StateSpec<Object, BagState<Integer>> bufferState = + StateSpecs.bag(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) BagState<Integer> state) { + Iterable<Integer> currentValue = state.read(); + state.add(c.element().getValue()); + if (Iterables.size(state.read()) >= 4) { + List<Integer> sorted = Lists.newArrayList(currentValue); + Collections.sort(sorted); + c.output(sorted); + + List<Integer> sideSorted = Lists.newArrayList(c.sideInput(listView)); + Collections.sort(sideSorted); + c.output(sideSorted); + } + } + }; + + PCollection<List<Integer>> output = + p.apply( + "Create main input", + Create.of( + KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12))) + .apply(ParDo.of(fn).withSideInputs(listView)); + + PAssert.that(output).containsInAnyOrder( + Lists.newArrayList(12, 42, 84, 97), + Lists.newArrayList(0, 1, 2)); + p.run(); + } + + @Test public void testWithOutputTagsDisplayData() { DoFn<String, String> fn = new DoFn<String, String>() { @ProcessElement
