Add a Test for windowed CombineGloballyAsSingletonView

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3de44a34
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3de44a34
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3de44a34

Branch: refs/heads/master
Commit: 3de44a348e3e0934c644c718255a43b8f42a3534
Parents: 079966c
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 3 11:24:14 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 3 14:59:12 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/CombineTest.java | 46 ++++++++++++++++++++
 1 file changed, 46 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3de44a34/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 5b18384..6c62d0b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -75,8 +76,10 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -627,6 +630,49 @@ public class CombineTest implements Serializable {
   }
 
   @Test
+  @Category(RunnableOnService.class)
+  public void testWindowedCombineGloballyAsSingletonView() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1));
+    final PCollectionView<Integer> view =
+        pipeline
+            .apply(
+                "CreateSideInput",
+                Create.timestamped(
+                    TimestampedValue.of(1, new Instant(100)),
+                    TimestampedValue.of(3, new Instant(100))))
+            .apply("WindowSideInput", Window.<Integer>into(windowFn))
+            .apply("CombineSideInput", 
Sum.integersGlobally().asSingletonView());
+
+    TimestampedValue<Void> nonEmptyElement = TimestampedValue.of(null, new 
Instant(100));
+    TimestampedValue<Void> emptyElement = 
TimestampedValue.atMinimumTimestamp(null);
+    PCollection<Integer> output =
+        pipeline
+            .apply(
+                "CreateMainInput",
+                Create.<Void>timestamped(nonEmptyElement, 
emptyElement).withCoder(VoidCoder.of()))
+            .apply("WindowMainInput", Window.<Void>into(windowFn))
+            .apply(
+                "OutputSideInput",
+                ParDo.of(
+                        new DoFn<Void, Integer>() {
+                          @ProcessElement
+                          public void processElement(ProcessContext c) {
+                            c.output(c.sideInput(view));
+                          }
+                        })
+                    .withSideInputs(view));
+
+    PAssert.that(output).containsInAnyOrder(4, 0);
+    PAssert.that(output)
+        .inWindow(windowFn.assignWindow(nonEmptyElement.getTimestamp()))
+        .containsInAnyOrder(4);
+    PAssert.that(output)
+        .inWindow(windowFn.assignWindow(emptyElement.getTimestamp()))
+        .containsInAnyOrder(0);
+    pipeline.run();
+  }
+
+  @Test
   public void testCombineGetName() {
     assertEquals("Combine.globally(SumInts)", Combine.globally(new 
SumInts()).getName());
     assertEquals(

Reply via email to