Add a Test for windowed CombineGloballyAsSingletonView
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de44a34 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de44a34 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de44a34 Branch: refs/heads/master Commit: 3de44a348e3e0934c644c718255a43b8f42a3534 Parents: 079966c Author: Thomas Groh <tg...@google.com> Authored: Fri Mar 3 11:24:14 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Mar 3 14:59:12 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/CombineTest.java | 46 ++++++++++++++++++++ 1 file changed, 46 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3de44a34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 5b18384..6c62d0b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -75,8 +76,10 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.Matchers; import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -627,6 +630,49 @@ public class CombineTest implements Serializable { } @Test + @Category(RunnableOnService.class) + public void testWindowedCombineGloballyAsSingletonView() { + FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1)); + final PCollectionView<Integer> view = + pipeline + .apply( + "CreateSideInput", + Create.timestamped( + TimestampedValue.of(1, new Instant(100)), + TimestampedValue.of(3, new Instant(100)))) + .apply("WindowSideInput", Window.<Integer>into(windowFn)) + .apply("CombineSideInput", Sum.integersGlobally().asSingletonView()); + + TimestampedValue<Void> nonEmptyElement = TimestampedValue.of(null, new Instant(100)); + TimestampedValue<Void> emptyElement = TimestampedValue.atMinimumTimestamp(null); + PCollection<Integer> output = + pipeline + .apply( + "CreateMainInput", + Create.<Void>timestamped(nonEmptyElement, emptyElement).withCoder(VoidCoder.of())) + .apply("WindowMainInput", Window.<Void>into(windowFn)) + .apply( + "OutputSideInput", + ParDo.of( + new DoFn<Void, Integer>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.sideInput(view)); + } + }) + .withSideInputs(view)); + + PAssert.that(output).containsInAnyOrder(4, 0); + PAssert.that(output) + .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp())) + .containsInAnyOrder(4); + PAssert.that(output) + .inWindow(windowFn.assignWindow(emptyElement.getTimestamp())) + .containsInAnyOrder(0); + pipeline.run(); + } + + @Test public void testCombineGetName() { assertEquals("Combine.globally(SumInts)", Combine.globally(new SumInts()).getName()); assertEquals(