This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 975d71a  Test SetState addIfAbsent with no read (#15776)
975d71a is described below

commit 975d71a5d6311ed2ae33766581947de405fefabf
Author: kileys <kiley...@google.com>
AuthorDate: Thu Oct 21 13:56:36 2021 -0700

    Test SetState addIfAbsent with no read (#15776)
---
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 49 ++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 00e4fab..c95fb9e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2497,6 +2497,55 @@ public class ParDoTest implements Serializable {
     }
 
     @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSetState.class,
+      UsesTestStream.class
+    })
+    public void testSetStateNoReadOnAddIfAbsentInsertsElement() {
+      final String stateId = "foo";
+      final String countStateId = "count";
+
+      DoFn<KV<String, Integer>, Integer> fn =
+          new DoFn<KV<String, Integer>, Integer>() {
+
+            @StateId(stateId)
+            private final StateSpec<SetState<Integer>> setState = 
StateSpecs.set(VarIntCoder.of());
+
+            @StateId(countStateId)
+            private final StateSpec<CombiningState<Integer, int[], Integer>> 
countState =
+                StateSpecs.combining(Sum.ofIntegers());
+
+            @ProcessElement
+            public void processElement(
+                ProcessContext c,
+                @Element KV<String, Integer> element,
+                @StateId(stateId) SetState<Integer> state,
+                @StateId(countStateId) CombiningState<Integer, int[], Integer> 
count,
+                OutputReceiver<Integer> r) {
+              state.addIfAbsent(element.getValue());
+              count.add(1);
+              if (count.read() >= 4) {
+                for (Integer entry : state.read()) {
+                  r.output(entry);
+                }
+              }
+            }
+          };
+      TestStream<KV<String, Integer>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0))
+              .addElements(KV.of("hello", 1), KV.of("hello", 2))
+              .addElements(KV.of("hello", 2), KV.of("hello", 3))
+              .advanceWatermarkToInfinity();
+      PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn));
+
+      PAssert.that(output).containsInAnyOrder(1, 2, 3);
+      pipeline.run();
+    }
+
+    @Test
     @Category({ValidatesRunner.class, UsesStatefulParDo.class, 
UsesOrderedListState.class})
     public void testOrderedListStateBounded() {
       testOrderedListStateImpl(false);

Reply via email to