Repository: beam
Updated Branches:
  refs/heads/master 0c24286e1 -> a26fd1ff3


Add some more RunnableOnService tests for stateful ParDo


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c00e912
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c00e912
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c00e912

Branch: refs/heads/master
Commit: 6c00e9121e6572fc06d0379802883c118acbed9f
Parents: f4e1097
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Jan 6 12:03:11 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 15:21:43 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/ParDoTest.java   | 138 +++++++++++++++++++
 1 file changed, 138 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6c00e912/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 2e3fb85..7381e06 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -80,6 +80,7 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
@@ -1502,6 +1503,104 @@ public class ParDoTest implements Serializable {
 
   @Test
   @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testValueStateFixedWindows() {
+    final String stateId = "foo";
+
+    DoFn<KV<String, Integer>, Integer> fn =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<Integer>> intState =
+              StateSpecs.value(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+            Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+            c.output(currentValue);
+            state.write(currentValue + 1);
+          }
+        };
+
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(20));
+
+    PCollection<Integer> output =
+        pipeline
+            .apply(
+                Create.timestamped(
+                    // first window
+                    TimestampedValue.of(KV.of("hello", 7), new Instant(1)),
+                    TimestampedValue.of(KV.of("hello", 14), new Instant(2)),
+                    TimestampedValue.of(KV.of("hello", 21), new Instant(3)),
+
+                    // second window
+                    TimestampedValue.of(KV.of("hello", 28), new Instant(11)),
+                    TimestampedValue.of(KV.of("hello", 35), new Instant(13))))
+            .apply(Window.<KV<String, 
Integer>>into(FixedWindows.of(Duration.millis(10))))
+            .apply("Stateful ParDo", ParDo.of(fn));
+
+    PAssert.that(output).inWindow(firstWindow).containsInAnyOrder(0, 1, 2);
+    PAssert.that(output).inWindow(secondWindow).containsInAnyOrder(0, 1);
+    pipeline.run();
+  }
+
+  /**
+   * Tests that there is no state bleeding between adjacent stateful {@link 
ParDo} transforms,
+   * which may (or may not) be executed in similar contexts after runner 
optimizations.
+   */
+  @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testValueStateSameId() {
+    final String stateId = "foo";
+
+    DoFn<KV<String, Integer>, KV<String, Integer>> fn =
+        new DoFn<KV<String, Integer>, KV<String, Integer>>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<Integer>> intState =
+              StateSpecs.value(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+            Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+            c.output(KV.of("sizzle", currentValue));
+            state.write(currentValue + 1);
+          }
+        };
+
+    DoFn<KV<String, Integer>, Integer> fn2 =
+        new DoFn<KV<String, Integer>, Integer>() {
+
+          @StateId(stateId)
+          private final StateSpec<Object, ValueState<Integer>> intState =
+              StateSpecs.value(VarIntCoder.of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+            Integer currentValue = MoreObjects.firstNonNull(state.read(), 13);
+            c.output(currentValue);
+            state.write(currentValue + 13);
+          }
+        };
+
+    PCollection<KV<String, Integer>> intermediate =
+        pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), 
KV.of("hello", 84)))
+            .apply("First stateful ParDo", ParDo.of(fn));
+
+    PCollection<Integer> output =
+            intermediate.apply("Second stateful ParDo", ParDo.of(fn2));
+
+    PAssert.that(intermediate)
+        .containsInAnyOrder(KV.of("sizzle", 0), KV.of("sizzle", 1), 
KV.of("sizzle", 2));
+    PAssert.that(output).containsInAnyOrder(13, 26, 39);
+    pipeline.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
   public void testValueStateSideOutput() {
     final String stateId = "foo";
 
@@ -1587,6 +1686,45 @@ public class ParDoTest implements Serializable {
 
   @Test
   @Category({RunnableOnService.class, UsesStatefulParDo.class})
+  public void testCombiningState() {
+    final String stateId = "foo";
+
+    DoFn<KV<String, Double>, String> fn =
+        new DoFn<KV<String, Double>, String>() {
+
+          private static final double EPSILON = 0.0001;
+
+          @StateId(stateId)
+          private final StateSpec<
+                  Object, AccumulatorCombiningState<Double, 
Mean.CountSum<Double>, Double>>
+              combiningState =
+                  StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), 
Mean.<Double>of());
+
+          @ProcessElement
+          public void processElement(
+              ProcessContext c,
+              @StateId(stateId)
+                  AccumulatorCombiningState<Double, Mean.CountSum<Double>, 
Double> state) {
+            state.add(c.element().getValue());
+            Double currentValue = state.read();
+            if (Math.abs(currentValue - 0.5) < EPSILON) {
+              c.output("right on");
+            }
+          }
+        };
+
+    PCollection<String> output =
+        pipeline
+            .apply(Create.of(KV.of("hello", 0.3), KV.of("hello", 0.6), 
KV.of("hello", 0.6)))
+            .apply(ParDo.of(fn));
+
+    // There should only be one moment at which the average is exactly 0.5
+    PAssert.that(output).containsInAnyOrder("right on");
+    pipeline.run();
+  }
+
+  @Test
+  @Category({RunnableOnService.class, UsesStatefulParDo.class})
   public void testBagStateSideInput() {
 
     final PCollectionView<List<Integer>> listView =

Reply via email to