This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 975d71a Test SetState addIfAbsent with no read (#15776) 975d71a is described below commit 975d71a5d6311ed2ae33766581947de405fefabf Author: kileys <kiley...@google.com> AuthorDate: Thu Oct 21 13:56:36 2021 -0700 Test SetState addIfAbsent with no read (#15776) --- .../org/apache/beam/sdk/transforms/ParDoTest.java | 49 ++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 00e4fab..c95fb9e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2497,6 +2497,55 @@ public class ParDoTest implements Serializable { } @Test + @Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesSetState.class, + UsesTestStream.class + }) + public void testSetStateNoReadOnAddIfAbsentInsertsElement() { + final String stateId = "foo"; + final String countStateId = "count"; + + DoFn<KV<String, Integer>, Integer> fn = + new DoFn<KV<String, Integer>, Integer>() { + + @StateId(stateId) + private final StateSpec<SetState<Integer>> setState = StateSpecs.set(VarIntCoder.of()); + + @StateId(countStateId) + private final StateSpec<CombiningState<Integer, int[], Integer>> countState = + StateSpecs.combining(Sum.ofIntegers()); + + @ProcessElement + public void processElement( + ProcessContext c, + @Element KV<String, Integer> element, + @StateId(stateId) SetState<Integer> state, + @StateId(countStateId) CombiningState<Integer, int[], Integer> count, + OutputReceiver<Integer> r) { + state.addIfAbsent(element.getValue()); + count.add(1); + if (count.read() >= 4) { + for (Integer entry : state.read()) { + r.output(entry); + } + } + } + }; + TestStream<KV<String, Integer>> stream = + TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) + .advanceWatermarkTo(new Instant(0)) + .addElements(KV.of("hello", 1), KV.of("hello", 2)) + .addElements(KV.of("hello", 2), KV.of("hello", 3)) + .advanceWatermarkToInfinity(); + PCollection<Integer> output = pipeline.apply(stream).apply(ParDo.of(fn)); + + PAssert.that(output).containsInAnyOrder(1, 2, 3); + pipeline.run(); + } + + @Test @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesOrderedListState.class}) public void testOrderedListStateBounded() { testOrderedListStateImpl(false);