This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 55880d361a5 Split ViewTest (#36655)
55880d361a5 is described below
commit 55880d361a597126b040558ead57c78a3085bde3
Author: Yi Hu <[email protected]>
AuthorDate: Wed Oct 29 13:46:46 2025 -0400
Split ViewTest (#36655)
---
.../transforms/{ViewTest.java => MapViewTest.java} | 884 +------------------
.../org/apache/beam/sdk/transforms/ViewTest.java | 951 ---------------------
2 files changed, 3 insertions(+), 1832 deletions(-)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapViewTest.java
similarity index 54%
copy from
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
copy to
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapViewTest.java
index 2bdc9061e23..005feda63ab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapViewTest.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.transforms;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -29,13 +27,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -43,49 +37,35 @@ import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesSideInputs;
-import org.apache.beam.sdk.testing.UsesTestStream;
-import org.apache.beam.sdk.testing.UsesTriggeredSideInputs;
import org.apache.beam.sdk.testing.ValidatesRunner;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
-import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
- * Tests for {@link View}. See also {@link ParDoTest}, which provides
additional coverage since
- * views can only be observed via {@link ParDo}.
+ * Tests for (multi)map {@link View}. See also {@link ParDoTest}, which
provides additional coverage
+ * since views can only be observed via {@link ParDo}.
*/
@RunWith(JUnit4.class)
@Category(UsesSideInputs.class)
-public class ViewTest implements Serializable {
+public class MapViewTest implements Serializable {
// This test is Serializable, just so that it's easy to have
// anonymous inner classes inside the non-static test methods.
@@ -95,604 +75,6 @@ public class ViewTest implements Serializable {
@Rule public transient Timeout globalTimeout = Timeout.seconds(1200);
- @Test
- @Category(ValidatesRunner.class)
- public void testSingletonSideInput() {
-
- final PCollectionView<Integer> view =
- pipeline.apply("Create47", Create.of(47)).apply(View.asSingleton());
-
- PCollection<Integer> output =
- pipeline
- .apply("Create123", Create.of(1, 2, 3))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(47, 47, 47);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedSingletonSideInput() {
-
- final PCollectionView<Integer> view =
- pipeline
- .apply(
- "Create47",
- Create.timestamped(
- TimestampedValue.of(47, new Instant(1)),
- TimestampedValue.of(48, new Instant(11))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asSingleton());
-
- PCollection<Integer> output =
- pipeline
- .apply(
- "Create123",
- Create.timestamped(
- TimestampedValue.of(1, new Instant(4)),
- TimestampedValue.of(2, new Instant(8)),
- TimestampedValue.of(3, new Instant(12))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(47, 47, 48);
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesTestStream.class})
- public void testWindowedSideInputNotPresent() {
- PCollection<KV<Long, Long>> input =
- pipeline.apply(
- TestStream.create(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
- .advanceWatermarkTo(new Instant(0))
- .addElements(TimestampedValue.of(KV.of(1000L, 1000L), new
Instant(1000L)))
- .advanceWatermarkTo(new Instant(20000))
- .advanceWatermarkToInfinity());
-
- final PCollectionView<Long> view =
- input
- .apply(Values.create())
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.standardSeconds(100))))
- .apply("ViewCombine",
Combine.globally(Sum.ofLongs()).withoutDefaults())
- .apply("Rewindow",
Window.into(FixedWindows.of(Duration.standardSeconds(10))))
- .apply(View.<Long>asSingleton().withDefaultValue(0L));
-
- PCollection<Long> output =
- input
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.standardSeconds(10))))
- .apply(GroupByKey.create())
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<KV<Long, Iterable<Long>>, Long>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .inWindow(new IntervalWindow(new Instant(0), new Instant(10000)))
- .containsInAnyOrder(0L);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testEmptySingletonSideInput() throws Exception {
-
- final PCollectionView<Integer> view =
- pipeline
- .apply("CreateEmptyIntegers", Create.empty(VarIntCoder.of()))
- .apply(View.asSingleton());
-
- pipeline
- .apply("Create123", Create.of(1, 2, 3))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- // As long as we get an error, be flexible with how a runner surfaces it
- thrown.expect(Exception.class);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testNonSingletonSideInput() throws Exception {
-
- PCollection<Integer> oneTwoThree = pipeline.apply(Create.of(1, 2, 3));
- final PCollectionView<Integer> view =
oneTwoThree.apply(View.asSingleton());
-
- oneTwoThree.apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- // As long as we get an error, be flexible with how a runner surfaces it
- thrown.expect(Exception.class);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testDiscardingNonSingletonSideInput() throws Exception {
-
- PCollection<Integer> oneTwoThree = pipeline.apply(Create.of(1, 2, 3));
- final PCollectionView<Integer> view =
- oneTwoThree
- .apply(Window.<Integer>configure().discardingFiredPanes())
- .apply(View.asSingleton());
-
- oneTwoThree.apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- // As long as we get an error, be flexible with how a runner surfaces it
- thrown.expect(Exception.class);
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesTriggeredSideInputs.class})
- public void testTriggeredLatestSingleton() {
- IntervalWindow zeroWindow = new IntervalWindow(new Instant(0), new
Instant(1000));
-
- PCollectionView<Long> view =
- pipeline
- .apply(
- GenerateSequence.from(0)
- .withRate(1, Duration.millis(100))
- .withTimestampFn(Instant::new)
- .withMaxReadTime(Duration.standardSeconds(10)))
- .apply(
- "Window side input",
- Window.<Long>into(FixedWindows.of(Duration.standardSeconds(1)))
-
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
- .withAllowedLateness(Duration.ZERO)
- .discardingFiredPanes())
- .apply(Reify.timestamps())
-
.apply(Combine.globally(Latest.<Long>combineFn()).withoutDefaults().asSingletonView());
-
- final String tag = "singleton";
- PCollection<Long> pc =
- pipeline
- .apply(Impulse.create())
- .apply(WithTimestamps.of(impulse -> new Instant(0)))
- .apply("Window main input",
Window.into(FixedWindows.of(Duration.standardSeconds(1))))
- .apply(
- ParDo.of(
- new DoFn<byte[], Long>() {
- @ProcessElement
- public void process(
- @SideInput(tag) Long sideInput,
OutputReceiver<Long> out)
- throws InterruptedException {
- // waiting to ensure multiple outputs to side
input before reading it
- Thread.sleep(1000L);
- out.output(sideInput);
- }
- })
- .withSideInput(tag, view));
-
- PAssert.that(pc)
- .inWindow(zeroWindow)
- .satisfies(
- (Iterable<Long> values) -> {
- assertThat(values, Matchers.iterableWithSize(1));
- return null;
- });
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testListSideInput() {
-
- final PCollectionView<List<Integer>> view =
- pipeline.apply("CreateSideInput", Create.of(11, 13, 17,
23)).apply(View.asList());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateMainInput", Create.of(29, 31))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- checkArgument(c.sideInput(view).size() == 4);
- checkArgument(
-
c.sideInput(view).get(0).equals(c.sideInput(view).get(0)));
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testListWithRandomAccessSideInput() {
-
- final PCollectionView<List<Integer>> view =
- pipeline
- .apply("CreateSideInput", Create.of(11, 13, 17, 23))
- .apply(View.<Integer>asList().withRandomAccess());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateMainInput", Create.of(29, 31))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- checkArgument(c.sideInput(view).size() == 4);
- checkArgument(
-
c.sideInput(view).get(0).equals(c.sideInput(view).get(0)));
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
-
- pipeline.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testListInMemorySideInput() {
-
- final PCollectionView<List<Integer>> view =
- pipeline
- .apply("CreateSideInput", Create.of(11, 13, 17, 23))
- .apply(View.<Integer>asList().inMemory());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateMainInput", Create.of(29, 31))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- checkArgument(c.sideInput(view).size() == 4);
- checkArgument(
-
c.sideInput(view).get(0).equals(c.sideInput(view).get(0)));
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedListSideInput() {
-
- final PCollectionView<List<Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(11, new Instant(1)),
- TimestampedValue.of(13, new Instant(1)),
- TimestampedValue.of(17, new Instant(1)),
- TimestampedValue.of(23, new Instant(1)),
- TimestampedValue.of(31, new Instant(11)),
- TimestampedValue.of(33, new Instant(11)),
- TimestampedValue.of(37, new Instant(11)),
- TimestampedValue.of(43, new Instant(11))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asList());
-
- PCollection<Integer> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of(29, new Instant(1)),
- TimestampedValue.of(35, new Instant(11))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- checkArgument(c.sideInput(view).size() == 4);
- checkArgument(
-
c.sideInput(view).get(0).equals(c.sideInput(view).get(0)));
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testEmptyListSideInput() throws Exception {
-
- final PCollectionView<List<Integer>> view =
- pipeline.apply("CreateEmptyView",
Create.empty(VarIntCoder.of())).apply(View.asList());
-
- PCollection<Integer> results =
- pipeline
- .apply("Create1", Create.of(1))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
-
assertFalse(c.sideInput(view).iterator().hasNext());
- c.output(1);
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(results).containsInAnyOrder(1);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testListSideInputIsImmutable() {
-
- final PCollectionView<List<Integer>> view =
- pipeline.apply("CreateSideInput", Create.of(11)).apply(View.asList());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateMainInput", Create.of(29))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- try {
- c.sideInput(view).clear();
- fail("Expected UnsupportedOperationException on
clear()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).add(4);
- fail("Expected UnsupportedOperationException on
add()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).addAll(new ArrayList<>());
- fail("Expected UnsupportedOperationException on
addAll()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).remove(0);
- fail("Expected UnsupportedOperationException on
remove()");
- } catch (UnsupportedOperationException expected) {
- }
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(output).containsInAnyOrder(11);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testIterableSideInput() {
-
- final PCollectionView<Iterable<Integer>> view =
- pipeline.apply("CreateSideInput", Create.of(11, 13, 17,
23)).apply(View.asIterable());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateMainInput", Create.of(29, 31))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedIterableSideInput() {
-
- final PCollectionView<Iterable<Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(11, new Instant(1)),
- TimestampedValue.of(13, new Instant(1)),
- TimestampedValue.of(17, new Instant(1)),
- TimestampedValue.of(23, new Instant(1)),
- TimestampedValue.of(31, new Instant(11)),
- TimestampedValue.of(33, new Instant(11)),
- TimestampedValue.of(37, new Instant(11)),
- TimestampedValue.of(43, new Instant(11))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asIterable());
-
- PCollection<Integer> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of(29, new Instant(1)),
- TimestampedValue.of(35, new Instant(11))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testEmptyIterableSideInput() throws Exception {
-
- final PCollectionView<Iterable<Integer>> view =
- pipeline.apply("CreateEmptyView",
Create.empty(VarIntCoder.of())).apply(View.asIterable());
-
- PCollection<Integer> results =
- pipeline
- .apply("Create1", Create.of(1))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
-
assertFalse(c.sideInput(view).iterator().hasNext());
- c.output(1);
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(results).containsInAnyOrder(1);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testIterableSideInputIsImmutable() {
-
- final PCollectionView<Iterable<Integer>> view =
- pipeline.apply("CreateSideInput",
Create.of(11)).apply(View.asIterable());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateMainInput", Create.of(29))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- Iterator<Integer> iterator =
c.sideInput(view).iterator();
- while (iterator.hasNext()) {
- try {
- iterator.remove();
- fail("Expected UnsupportedOperationException
on remove()");
- } catch (UnsupportedOperationException expected)
{
- }
- c.output(iterator.next());
- }
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(output).containsInAnyOrder(11);
-
- pipeline.run();
- }
-
@Test
@Category(ValidatesRunner.class)
public void testMultimapSideInput() {
@@ -1631,264 +1013,4 @@ public class ViewTest implements Serializable {
pipeline.run();
}
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedSideInputFixedToFixed() {
-
- final PCollectionView<Integer> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(11)),
- TimestampedValue.of(3, new Instant(13))))
- .apply("WindowSideInput",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(Sum.integersGlobally().withoutDefaults())
- .apply(View.asSingleton());
-
- PCollection<String> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("A", new Instant(4)),
- TimestampedValue.of("B", new Instant(15)),
- TimestampedValue.of("C", new Instant(7))))
- .apply("WindowMainInput",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputMainAndSideInputs",
- ParDo.of(
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element() + c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder("A1", "B5", "C1");
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedSideInputFixedToGlobal() {
-
- final PCollectionView<Integer> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(11)),
- TimestampedValue.of(3, new Instant(13))))
- .apply("WindowSideInput", Window.into(new GlobalWindows()))
- .apply(Sum.integersGlobally())
- .apply(View.asSingleton());
-
- PCollection<String> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("A", new Instant(4)),
- TimestampedValue.of("B", new Instant(15)),
- TimestampedValue.of("C", new Instant(7))))
- .apply("WindowMainInput",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputMainAndSideInputs",
- ParDo.of(
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element() + c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder("A6", "B6", "C6");
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedSideInputFixedToFixedWithDefault() {
-
- final PCollectionView<Integer> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(2, new Instant(11)),
- TimestampedValue.of(3, new Instant(13))))
- .apply("WindowSideInput",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(Sum.integersGlobally().asSingletonView());
-
- PCollection<String> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("A", new Instant(4)),
- TimestampedValue.of("B", new Instant(15)),
- TimestampedValue.of("C", new Instant(7))))
- .apply("WindowMainInput",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputMainAndSideInputs",
- ParDo.of(
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element() + c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder("A0", "B5", "C0");
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testSideInputWithNullDefault() {
-
- final PCollectionView<Void> view =
- pipeline
- .apply("CreateSideInput", Create.of((Void)
null).withCoder(VoidCoder.of()))
- .apply(Combine.globally((Iterable<Void> input) ->
null).asSingletonView());
-
- @SuppressWarnings("ObjectToString")
- PCollection<String> output =
- pipeline
- .apply("CreateMainInput", Create.of(""))
- .apply(
- "OutputMainAndSideInputs",
- ParDo.of(
- new DoFn<String, String>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element() + c.sideInput(view));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder("null");
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testSideInputWithNestedIterables() {
- final PCollectionView<Iterable<Integer>> view1 =
- pipeline
- .apply("CreateVoid1", Create.of((Void)
null).withCoder(VoidCoder.of()))
- .apply(
- "OutputOneInteger",
- ParDo.of(
- new DoFn<Void, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(17);
- }
- }))
- .apply("View1", View.asIterable());
-
- final PCollectionView<Iterable<Iterable<Integer>>> view2 =
- pipeline
- .apply("CreateVoid2", Create.of((Void)
null).withCoder(VoidCoder.of()))
- .apply(
- "OutputSideInput",
- ParDo.of(
- new DoFn<Void, Iterable<Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view1));
- }
- })
- .withSideInputs(view1))
- .apply("View2", View.asIterable());
-
- PCollection<Integer> output =
- pipeline
- .apply("CreateVoid3", Create.of((Void)
null).withCoder(VoidCoder.of()))
- .apply(
- "ReadIterableSideInput",
- ParDo.of(
- new DoFn<Void, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Iterable<Integer> input : c.sideInput(view2))
{
- for (Integer i : input) {
- c.output(i);
- }
- }
- }
- })
- .withSideInputs(view2));
-
- PAssert.that(output).containsInAnyOrder(17);
-
- pipeline.run();
- }
-
- @Test
- public void testViewGetName() {
- assertEquals("View.AsSingleton", View.<Integer>asSingleton().getName());
- assertEquals("View.AsIterable", View.<Integer>asIterable().getName());
- assertEquals("View.AsMap", View.<String, Integer>asMap().getName());
- assertEquals("View.AsMultimap", View.<String,
Integer>asMultimap().getName());
- }
-
- private void testViewUnbounded(
- Pipeline pipeline,
- PTransform<PCollection<KV<String, Integer>>, ? extends
PCollectionView<?>> view) {
- thrown.expect(IllegalStateException.class);
- thrown.expectMessage("Unable to create a side-input view from input");
- thrown.expectCause(
-
ThrowableMessageMatcher.hasMessage(Matchers.containsString("non-bounded
PCollection")));
- pipeline
- .apply(
- new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
- @Override
- public PCollection<KV<String, Integer>> expand(PBegin input) {
- return PCollection.createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- PCollection.IsBounded.UNBOUNDED,
- KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()));
- }
- })
- .apply(view);
- }
-
- @Test
- public void testViewUnboundedAsSingletonDirect() {
- testViewUnbounded(pipeline, View.asSingleton());
- }
-
- @Test
- public void testViewUnboundedAsIterableDirect() {
- testViewUnbounded(pipeline, View.asIterable());
- }
-
- @Test
- public void testViewUnboundedAsListDirect() {
- testViewUnbounded(pipeline, View.asList());
- }
-
- @Test
- public void testViewUnboundedAsMapDirect() {
- testViewUnbounded(pipeline, View.asMap());
- }
-
- @Test
- public void testViewUnboundedAsMultimapDirect() {
- testViewUnbounded(pipeline, View.asMultimap());
- }
}
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 2bdc9061e23..06aa9adaf74 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -24,23 +24,12 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
@@ -66,7 +55,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -693,945 +681,6 @@ public class ViewTest implements Serializable {
pipeline.run();
}
- @Test
- @Category(ValidatesRunner.class)
- public void testMultimapSideInput() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2),
KV.of("b", 3)))
- .apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer v :
c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(
- KV.of("apple", 1),
- KV.of("apple", 1),
- KV.of("apple", 2),
- KV.of("banana", 3),
- KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testMultimapAsEntrySetSideInput() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2),
KV.of("b", 3)))
- .apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of(2 /* size */))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertEquals((int) c.element(),
c.sideInput(view).size());
- assertEquals((int) c.element(),
c.sideInput(view).entrySet().size());
- for (Entry<String, Iterable<Integer>> entry :
- c.sideInput(view).entrySet()) {
- for (Integer value : entry.getValue()) {
- c.output(KV.of(entry.getKey(), value));
- }
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2),
KV.of("b", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testMultimapInMemorySideInput() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2),
KV.of("b", 3)))
- .apply(View.<String, Integer>asMultimap().inMemory());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer v :
c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(
- KV.of("apple", 1),
- KV.of("apple", 1),
- KV.of("apple", 2),
- KV.of("banana", 3),
- KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- private static class NonDeterministicStringCoder extends AtomicCoder<String>
{
- @Override
- public void encode(String value, OutputStream outStream) throws
CoderException, IOException {
- encode(value, outStream, Coder.Context.NESTED);
- }
-
- @Override
- public void encode(String value, OutputStream outStream, Coder.Context
context)
- throws CoderException, IOException {
- StringUtf8Coder.of().encode(value, outStream, context);
- }
-
- @Override
- public String decode(InputStream inStream) throws CoderException,
IOException {
- return decode(inStream, Coder.Context.NESTED);
- }
-
- @Override
- public String decode(InputStream inStream, Coder.Context context)
- throws CoderException, IOException {
- return StringUtf8Coder.of().decode(inStream, context);
- }
-
- @Override
- public void verifyDeterministic()
- throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {
- throw new NonDeterministicException(this, "Test coder is not
deterministic on purpose.");
- }
- }
-
- @Test
- @Category({ValidatesRunner.class})
- public void testMultimapSideInputWithNonDeterministicKeyCoder() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2),
KV.of("b", 3))
- .withCoder(KvCoder.of(new NonDeterministicStringCoder(),
VarIntCoder.of())))
- .apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer v :
c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(
- KV.of("apple", 1),
- KV.of("apple", 1),
- KV.of("apple", 2),
- KV.of("banana", 3),
- KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedMultimapSideInput() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(KV.of("a", 1), new Instant(1)),
- TimestampedValue.of(KV.of("a", 1), new Instant(2)),
- TimestampedValue.of(KV.of("a", 2), new Instant(7)),
- TimestampedValue.of(KV.of("b", 3), new Instant(14))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("apple", new Instant(5)),
- TimestampedValue.of("banana", new Instant(13)),
- TimestampedValue.of("blackberry", new Instant(16))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer v :
c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(
- KV.of("apple", 1),
- KV.of("apple", 1),
- KV.of("apple", 2),
- KV.of("banana", 3),
- KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedMultimapAsEntrySetSideInput() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(KV.of("a", 1), new Instant(1)),
- TimestampedValue.of(KV.of("a", 1), new Instant(2)),
- TimestampedValue.of(KV.of("a", 2), new Instant(7)),
- TimestampedValue.of(KV.of("b", 3), new Instant(14))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of(1 /* size */, new Instant(5)),
- TimestampedValue.of(1 /* size */, new Instant(16))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertEquals((int) c.element(),
c.sideInput(view).size());
- assertEquals((int) c.element(),
c.sideInput(view).entrySet().size());
- for (Entry<String, Iterable<Integer>> entry :
- c.sideInput(view).entrySet()) {
- for (Integer value : entry.getValue()) {
- c.output(KV.of(entry.getKey(), value));
- }
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("a", 1), KV.of("a", 1), KV.of("a", 2),
KV.of("b", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class})
- public void testWindowedMultimapSideInputWithNonDeterministicKeyCoder() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(KV.of("a", 1), new Instant(1)),
- TimestampedValue.of(KV.of("a", 1), new Instant(2)),
- TimestampedValue.of(KV.of("a", 2), new Instant(7)),
- TimestampedValue.of(KV.of("b", 3), new Instant(14)))
- .withCoder(KvCoder.of(new NonDeterministicStringCoder(),
VarIntCoder.of())))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("apple", new Instant(5)),
- TimestampedValue.of("banana", new Instant(13)),
- TimestampedValue.of("blackberry", new Instant(16))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (Integer v :
c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(
- KV.of("apple", 1),
- KV.of("apple", 1),
- KV.of("apple", 2),
- KV.of("banana", 3),
- KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testEmptyMultimapSideInput() throws Exception {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateEmptyView",
Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
- .apply(View.asMultimap());
-
- PCollection<Integer> results =
- pipeline
- .apply("Create1", Create.of(1))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
-
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(results).containsInAnyOrder(1);
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class})
- public void testEmptyMultimapSideInputWithNonDeterministicKeyCoder() throws
Exception {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline
- .apply(
- "CreateEmptyView",
- Create.empty(KvCoder.of(new NonDeterministicStringCoder(),
VarIntCoder.of())))
- .apply(View.asMultimap());
-
- PCollection<Integer> results =
- pipeline
- .apply("Create1", Create.of(1))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
-
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(results).containsInAnyOrder(1);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testMultimapSideInputIsImmutable() {
-
- final PCollectionView<Map<String, Iterable<Integer>>> view =
- pipeline.apply("CreateSideInput", Create.of(KV.of("a",
1))).apply(View.asMultimap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- try {
- c.sideInput(view).clear();
- fail("Expected UnsupportedOperationException on
clear()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).put("c", ImmutableList.of(3));
- fail("Expected UnsupportedOperationException on
put()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).remove("c");
- fail("Expected UnsupportedOperationException on
remove()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).putAll(new HashMap<>());
- fail("Expected UnsupportedOperationException on
putAll()");
- } catch (UnsupportedOperationException expected) {
- }
- for (Integer v :
c.sideInput(view).get(c.element().substring(0, 1))) {
- c.output(KV.of(c.element(), v));
- }
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testMapSideInput() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testMapAsEntrySetSideInput() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of(2 /* size */))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertEquals((int) c.element(),
c.sideInput(view).size());
- assertEquals((int) c.element(),
c.sideInput(view).entrySet().size());
- for (Entry<String, Integer> entry :
c.sideInput(view).entrySet()) {
- c.output(KV.of(entry.getKey(),
entry.getValue()));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(KV.of("a", 1), KV.of("b", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testMapInMemorySideInput() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("b", 3)))
- .apply(View.<String, Integer>asMap().inMemory());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testMapInMemorySideInputWithNonStructuralKey() {
-
- final PCollectionView<Map<byte[], Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(
- KV.of("a".getBytes(StandardCharsets.UTF_8), 1),
- KV.of("b".getBytes(StandardCharsets.UTF_8), 3)))
- .apply(View.<byte[], Integer>asMap().inMemory());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
- c.sideInput(view)
- .get(
- c.element()
- .substring(0, 1)
-
.getBytes(StandardCharsets.UTF_8))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class})
- public void testMapSideInputWithNonDeterministicKeyCoder() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(KV.of("a", 1), KV.of("b", 3))
- .withCoder(KvCoder.of(new NonDeterministicStringCoder(),
VarIntCoder.of())))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedMapSideInput() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(KV.of("a", 1), new Instant(1)),
- TimestampedValue.of(KV.of("b", 2), new Instant(4)),
- TimestampedValue.of(KV.of("b", 3), new Instant(18))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("apple", new Instant(5)),
- TimestampedValue.of("banana", new Instant(4)),
- TimestampedValue.of("blackberry", new Instant(16))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 2),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testWindowedMapAsEntrySetSideInput() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(KV.of("a", 1), new Instant(1)),
- TimestampedValue.of(KV.of("b", 2), new Instant(4)),
- TimestampedValue.of(KV.of("b", 3), new Instant(18))))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of(2 /* size */, new Instant(5)),
- TimestampedValue.of(1 /* size */, new Instant(16))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertEquals((int) c.element(),
c.sideInput(view).size());
- assertEquals((int) c.element(),
c.sideInput(view).entrySet().size());
- for (Entry<String, Integer> entry :
c.sideInput(view).entrySet()) {
- c.output(KV.of(entry.getKey(),
entry.getValue()));
- }
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output).containsInAnyOrder(KV.of("a", 1), KV.of("b", 2),
KV.of("b", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class})
- public void testWindowedMapSideInputWithNonDeterministicKeyCoder() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.timestamped(
- TimestampedValue.of(KV.of("a", 1), new Instant(1)),
- TimestampedValue.of(KV.of("b", 2), new Instant(4)),
- TimestampedValue.of(KV.of("b", 3), new Instant(18)))
- .withCoder(KvCoder.of(new NonDeterministicStringCoder(),
VarIntCoder.of())))
- .apply("SideWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply(
- "CreateMainInput",
- Create.timestamped(
- TimestampedValue.of("apple", new Instant(5)),
- TimestampedValue.of("banana", new Instant(4)),
- TimestampedValue.of("blackberry", new Instant(16))))
- .apply("MainWindowInto",
Window.into(FixedWindows.of(Duration.millis(10))))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 2),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testEmptyMapSideInput() throws Exception {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateEmptyView",
Create.empty(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())))
- .apply(View.asMap());
-
- PCollection<Integer> results =
- pipeline
- .apply("Create1", Create.of(1))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
-
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(results).containsInAnyOrder(1);
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class})
- public void testEmptyMapSideInputWithNonDeterministicKeyCoder() throws
Exception {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateEmptyView",
- Create.empty(KvCoder.of(new NonDeterministicStringCoder(),
VarIntCoder.of())))
- .apply(View.asMap());
-
- PCollection<Integer> results =
- pipeline
- .apply("Create1", Create.of(1))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
-
assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(results).containsInAnyOrder(1);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testMapSideInputWithNullValuesCatchesDuplicates() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply(
- "CreateSideInput",
- Create.of(KV.of("a", (Integer) null), KV.of("a", (Integer)
null))
- .withCoder(
- KvCoder.of(StringUtf8Coder.of(),
NullableCoder.of(VarIntCoder.of()))))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
- c.sideInput(view)
- .getOrDefault(c.element().substring(0,
1), 0)));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 1), KV.of("banana", 3),
KV.of("blackberry", 3));
-
- // As long as we get an error, be flexible with how a runner surfaces it
- thrown.expect(Exception.class);
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testMapSideInputIsImmutable() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline.apply("CreateSideInput", Create.of(KV.of("a",
1))).apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple"))
- .apply(
- "OutputSideInputs",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- try {
- c.sideInput(view).clear();
- fail("Expected UnsupportedOperationException on
clear()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).put("c", 3);
- fail("Expected UnsupportedOperationException on
put()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).remove("c");
- fail("Expected UnsupportedOperationException on
remove()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).putAll(new HashMap<>());
- fail("Expected UnsupportedOperationException on
putAll()");
- } catch (UnsupportedOperationException expected) {
- }
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- // Pass at least one value through to guarantee that DoFn executes.
- PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
-
- pipeline.run();
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testCombinedMapSideInput() {
-
- final PCollectionView<Map<String, Integer>> view =
- pipeline
- .apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20),
KV.of("b", 3)))
- .apply("SumIntegers", Combine.perKey(Sum.ofIntegers()))
- .apply(View.asMap());
-
- PCollection<KV<String, Integer>> output =
- pipeline
- .apply("CreateMainInput", Create.of("apple", "banana",
"blackberry"))
- .apply(
- "Output",
- ParDo.of(
- new DoFn<String, KV<String, Integer>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(
- KV.of(
- c.element(),
-
c.sideInput(view).get(c.element().substring(0, 1))));
- }
- })
- .withSideInputs(view));
-
- PAssert.that(output)
- .containsInAnyOrder(KV.of("apple", 21), KV.of("banana", 3),
KV.of("blackberry", 3));
-
- pipeline.run();
- }
-
@Test
@Category(ValidatesRunner.class)
public void testWindowedSideInputFixedToFixed() {