http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java index 2a89a18..3bc0a65 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Random; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; @@ -76,12 +75,12 @@ import org.junit.runners.JUnit4; @SuppressWarnings("unchecked") public class CreateTest { @Rule public final ExpectedException thrown = ExpectedException.none(); + @Rule public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testCreate() { - Pipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of(LINES)); @@ -93,8 +92,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of(NO_LINES) .withCoder(StringUtf8Coder.of())); @@ -106,7 +103,7 @@ public class CreateTest { @Test public void testCreateEmptyInfersCoder() { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); PCollection<Object> output = p.apply(Create.of()); @@ -126,8 +123,6 @@ public class CreateTest { thrown.expectMessage( Matchers.containsString("Unable to infer a coder")); - Pipeline p = TestPipeline.create(); - // Create won't infer a default coder in this case. p.apply(Create.of(new Record(), new Record2())); @@ -137,8 +132,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateWithNullsAndValues() throws Exception { - Pipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of(null, "test1", null, "test2", null) .withCoder(SerializableCoder.of(String.class))); @@ -150,8 +143,6 @@ public class CreateTest { @Test @Category(NeedsRunner.class) public void testCreateParameterizedType() throws Exception { - Pipeline p = TestPipeline.create(); - PCollection<TimestampedValue<String>> output = p.apply(Create.of( TimestampedValue.of("a", new Instant(0)), @@ -216,7 +207,6 @@ public class CreateTest { Create.Values<UnserializableRecord> create = Create.of(elements).withCoder(new UnserializableRecord.UnserializableRecordCoder()); - TestPipeline p = TestPipeline.create(); PAssert.that(p.apply(create)) .containsInAnyOrder( new UnserializableRecord("foo"), @@ -235,8 +225,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateTimestamped() { - Pipeline p = TestPipeline.create(); - List<TimestampedValue<String>> data = Arrays.asList( TimestampedValue.of("a", new Instant(1L)), TimestampedValue.of("b", new Instant(2L)), @@ -254,8 +242,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateTimestampedEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<String> output = p .apply(Create.timestamped(new ArrayList<TimestampedValue<String>>()) .withCoder(StringUtf8Coder.of())); @@ -266,7 +252,7 @@ public class CreateTest { @Test public void testCreateTimestampedEmptyInfersCoder() { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); PCollection<Object> output = p .apply(Create.timestamped()); @@ -280,8 +266,6 @@ public class CreateTest { thrown.expectMessage( Matchers.containsString("Unable to infer a coder")); - Pipeline p = TestPipeline.create(); - // Create won't infer a default coder in this case. PCollection<Record> c = p.apply(Create.timestamped( TimestampedValue.of(new Record(), new Instant(0)), @@ -295,7 +279,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateWithVoidType() throws Exception { - Pipeline p = TestPipeline.create(); PCollection<Void> output = p.apply(Create.of((Void) null, (Void) null)); PAssert.that(output).containsInAnyOrder((Void) null, (Void) null); p.run(); @@ -304,8 +287,6 @@ public class CreateTest { @Test @Category(RunnableOnService.class) public void testCreateWithKVVoidType() throws Exception { - Pipeline p = TestPipeline.create(); - PCollection<KV<Void, Void>> output = p.apply(Create.of( KV.of((Void) null, (Void) null), KV.of((Void) null, (Void) null)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java index 257b364..b3b3925 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java @@ -24,13 +24,13 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -41,6 +41,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DistinctTest { + + @Rule + public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testDistinct() { @@ -53,8 +57,6 @@ public class DistinctTest { "k2", "k3"); - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(strings) .withCoder(StringUtf8Coder.of())); @@ -72,8 +74,6 @@ public class DistinctTest { public void testDistinctEmpty() { List<String> strings = Arrays.asList(); - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(strings) .withCoder(StringUtf8Coder.of())); @@ -115,8 +115,6 @@ public class DistinctTest { KV.of("k1", "v2"), KV.of("k2", "v1")); - Pipeline p = TestPipeline.create(); - PCollection<KV<String, String>> input = p.apply(Create.of(strings)); PCollection<KV<String, String>> output = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java index e5f5cb6..19b7c51 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java @@ -40,6 +40,9 @@ import org.junit.runners.JUnit4; /** Tests for {@link DoFn}. */ @RunWith(JUnit4.class) public class DoFnTest implements Serializable { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -200,7 +203,6 @@ public class DoFnTest implements Serializable { * Initialize a test pipeline with the specified {@link DoFn}. */ private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) { - TestPipeline pipeline = TestPipeline.create(); pipeline.apply(Create.of((InputT) null)) .apply(ParDo.of(fn)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 2dafa27..3859c9f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -52,6 +52,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DoFnTesterTest { + + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); @Test @@ -324,7 +326,7 @@ public class DoFnTesterTest { public void fnWithSideInputDefault() throws Exception { final PCollectionView<Integer> value = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) { tester.processElement(1); @@ -339,7 +341,7 @@ public class DoFnTesterTest { public void fnWithSideInputExplicit() throws Exception { final PCollectionView<Integer> value = PCollectionViews.singletonView( - TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); + p, WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of()); try (DoFnTester<Integer, Integer> tester = DoFnTester.of(new SideInputDoFn(value))) { tester.setSideInput(value, GlobalWindow.INSTANCE, -2); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java index 5221f75..81e1d02 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -57,11 +58,12 @@ public class FilterTest implements Serializable { } } + @Rule + public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testIdentityFilterByPredicate() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) .apply(Filter.by(new TrivialFn(true))); @@ -73,8 +75,6 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) public void testNoFilterByPredicate() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(1, 2, 4, 5)) .apply(Filter.by(new TrivialFn(false))); @@ -86,8 +86,6 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterByPredicate() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) .apply(Filter.by(new EvenFn())); @@ -99,8 +97,6 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterLessThan() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) .apply(Filter.lessThan(4)); @@ -112,8 +108,6 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterGreaterThan() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) .apply(Filter.greaterThan(4)); @@ -125,8 +119,6 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterLessThanEq() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) .apply(Filter.lessThanEq(4)); @@ -138,8 +130,6 @@ public class FilterTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFilterGreaterThanEq() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> output = p .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) .apply(Filter.greaterThanEq(4)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java index bb2877e..b24071e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java @@ -27,7 +27,6 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Set; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -49,6 +48,9 @@ import org.junit.runners.JUnit4; public class FlatMapElementsTest implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); /** @@ -57,7 +59,6 @@ public class FlatMapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testFlatMapBasic() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -82,7 +83,6 @@ public class FlatMapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testFlatMapFnOutputTypeDescriptor() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<String> output = pipeline .apply(Create.of("hello")) .apply(FlatMapElements.via(new SimpleFunction<String, Set<String>>() { @@ -117,7 +117,8 @@ public class FlatMapElementsTest implements Serializable { */ @Test public void testPolymorphicSimpleFunction() throws Exception { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -168,7 +169,6 @@ public class FlatMapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testVoidValues() throws Exception { - Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.of("hello")) .apply(WithKeys.<String, String>of("k")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index d4686a4..48251bc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -65,6 +65,9 @@ import org.junit.runners.JUnit4; public class FlattenTest implements Serializable { @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -74,8 +77,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenPCollectionList() { - Pipeline p = TestPipeline.create(); - List<List<String>> inputs = Arrays.asList( LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES); @@ -90,8 +91,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenPCollectionListThenParDo() { - Pipeline p = TestPipeline.create(); - List<List<String>> inputs = Arrays.asList( LINES, NO_LINES, LINES2, NO_LINES, LINES, NO_LINES); @@ -107,8 +106,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenPCollectionListEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<String> output = PCollectionList.<String>empty(p) .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of()); @@ -120,8 +117,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenInputMultipleCopies() { - Pipeline p = TestPipeline.create(); - int count = 5; PCollection<Long> longs = p.apply("mkLines", CountingInput.upTo(count)); PCollection<Long> biggerLongs = @@ -154,8 +149,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyFlattenAsSideInput() { - Pipeline p = TestPipeline.create(); - final PCollectionView<Iterable<String>> view = PCollectionList.<String>empty(p) .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of()) @@ -179,9 +172,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenPCollectionListEmptyThenParDo() { - - Pipeline p = TestPipeline.create(); - PCollection<String> output = PCollectionList.<String>empty(p) .apply(Flatten.<String>pCollections()).setCoder(StringUtf8Coder.of()) @@ -198,8 +188,6 @@ public class FlattenTest implements Serializable { thrown.expect(IllegalStateException.class); thrown.expectMessage("cannot provide a Coder for empty"); - Pipeline p = TestPipeline.create(); - PCollectionList.<ClassWithoutCoder>empty(p) .apply(Flatten.<ClassWithoutCoder>pCollections()); @@ -211,8 +199,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenIterables() { - Pipeline p = TestPipeline.create(); - PCollection<Iterable<String>> input = p .apply(Create.<Iterable<String>>of(LINES) .withCoder(IterableCoder.of(StringUtf8Coder.of()))); @@ -229,8 +215,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenIterablesLists() { - Pipeline p = TestPipeline.create(); - PCollection<List<String>> input = p.apply(Create.<List<String>>of(LINES).withCoder(ListCoder.of(StringUtf8Coder.of()))); @@ -244,8 +228,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenIterablesSets() { - Pipeline p = TestPipeline.create(); - Set<String> linesSet = ImmutableSet.copyOf(LINES); PCollection<Set<String>> input = @@ -261,9 +243,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenIterablesCollections() { - - Pipeline p = TestPipeline.create(); - Set<String> linesSet = ImmutableSet.copyOf(LINES); PCollection<Collection<String>> input = @@ -280,8 +259,6 @@ public class FlattenTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFlattenIterablesEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<Iterable<String>> input = p .apply(Create.<Iterable<String>>of(NO_LINES) .withCoder(IterableCoder.of(StringUtf8Coder.of()))); @@ -300,8 +277,6 @@ public class FlattenTest implements Serializable { @Test @Category(NeedsRunner.class) public void testEqualWindowFnPropagation() { - Pipeline p = TestPipeline.create(); - PCollection<String> input1 = p.apply("CreateInput1", Create.of("Input1")) .apply("Window1", Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))); @@ -322,8 +297,6 @@ public class FlattenTest implements Serializable { @Test @Category(NeedsRunner.class) public void testCompatibleWindowFnPropagation() { - Pipeline p = TestPipeline.create(); - PCollection<String> input1 = p.apply("CreateInput1", Create.of("Input1")) .apply("Window1", @@ -345,7 +318,7 @@ public class FlattenTest implements Serializable { @Test public void testIncompatibleWindowFnPropagationFailure() { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); PCollection<String> input1 = p.apply("CreateInput1", Create.of("Input1")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index ebde110..f4bec3a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -39,7 +39,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; @@ -81,6 +80,9 @@ import org.junit.runners.JUnit4; public class GroupByKeyTest { @Rule + public final TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); @Test @@ -95,8 +97,6 @@ public class GroupByKeyTest { KV.of("k2", -33), KV.of("k3", 0)); - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(ungroupedPairs) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -137,8 +137,6 @@ public class GroupByKeyTest { KV.of("k2", -33), // window [5, 10) KV.of("k3", 0)); // window [5, 10) - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.timestamped(ungroupedPairs, Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -174,8 +172,6 @@ public class GroupByKeyTest { public void testGroupByKeyEmpty() { List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(ungroupedPairs) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -193,8 +189,6 @@ public class GroupByKeyTest { List<KV<Map<String, String>, Integer>> ungroupedPairs = Arrays.asList(); - Pipeline p = TestPipeline.create(); - PCollection<KV<Map<String, String>, Integer>> input = p.apply(Create.of(ungroupedPairs) .withCoder( @@ -209,7 +203,6 @@ public class GroupByKeyTest { @Test @Category(NeedsRunner.class) public void testIdentityWindowFnPropagation() { - Pipeline p = TestPipeline.create(); List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); @@ -230,7 +223,6 @@ public class GroupByKeyTest { @Test @Category(NeedsRunner.class) public void testWindowFnInvalidation() { - Pipeline p = TestPipeline.create(); List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); @@ -255,7 +247,6 @@ public class GroupByKeyTest { @Test public void testInvalidWindowsDirect() { - Pipeline p = TestPipeline.create(); List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); @@ -275,7 +266,6 @@ public class GroupByKeyTest { @Test @Category(NeedsRunner.class) public void testRemerge() { - Pipeline p = TestPipeline.create(); List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); @@ -300,7 +290,6 @@ public class GroupByKeyTest { @Test public void testGroupByKeyDirectUnbounded() { - Pipeline p = TestPipeline.create(); PCollection<KV<String, Integer>> input = p.apply( @@ -331,9 +320,8 @@ public class GroupByKeyTest { @Test @Category(RunnableOnService.class) public void testOutputTimeFnEarliest() { - Pipeline pipeline = TestPipeline.create(); - pipeline.apply( + p.apply( Create.timestamped( TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) @@ -342,7 +330,7 @@ public class GroupByKeyTest { .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new AssertTimestamp(new Instant(0)))); - pipeline.run(); + p.run(); } @@ -353,9 +341,7 @@ public class GroupByKeyTest { @Test @Category(RunnableOnService.class) public void testOutputTimeFnLatest() { - Pipeline pipeline = TestPipeline.create(); - - pipeline.apply( + p.apply( Create.timestamped( TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) @@ -364,7 +350,7 @@ public class GroupByKeyTest { .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new AssertTimestamp(new Instant(10)))); - pipeline.run(); + p.run(); } private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> { @@ -408,8 +394,6 @@ public class GroupByKeyTest { final int numValues = 10; final int numKeys = 5; - Pipeline p = TestPipeline.create(); - p.getCoderRegistry().registerCoder(BadEqualityKey.class, DeterministicKeyCoder.class); // construct input data http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java index fce5b2f..2a19802 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KeysTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -27,6 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -50,11 +50,12 @@ public class KeysTest { static final KV<String, Integer>[] EMPTY_TABLE = new KV[] { }; + @Rule + public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testKeys() { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(Arrays.asList(TABLE)).withCoder( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -69,8 +70,6 @@ public class KeysTest { @Test @Category(RunnableOnService.class) public void testKeysEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java index 3598198..24186ed 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/KvSwapTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -27,6 +26,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -50,11 +50,12 @@ public class KvSwapTest { static final KV<String, Integer>[] EMPTY_TABLE = new KV[] { }; + @Rule + public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testKvSwap() { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(Arrays.asList(TABLE)).withCoder( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -75,8 +76,6 @@ public class KvSwapTest { @Test @Category(RunnableOnService.class) public void testKvSwapEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java index ce9ae37..f71b813 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java @@ -51,12 +51,13 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class LatestTest implements Serializable { + + @Rule public final transient TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @Category(NeedsRunner.class) public void testGloballyEventTimestamp() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.timestamped( TimestampedValue.of("foo", new Instant(100)), @@ -71,7 +72,8 @@ public class LatestTest implements Serializable { @Test public void testGloballyOutputCoder() { - TestPipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); + BigEndianLongCoder inputCoder = BigEndianLongCoder.of(); PCollection<Long> output = @@ -86,7 +88,6 @@ public class LatestTest implements Serializable { @Test @Category(NeedsRunner.class) public void testGloballyEmptyCollection() { - TestPipeline p = TestPipeline.create(); PCollection<Long> emptyInput = p.apply(Create.<Long>of() // Explicitly set coder such that then runner enforces encodability. .withCoder(VarLongCoder.of())); @@ -99,7 +100,6 @@ public class LatestTest implements Serializable { @Test @Category(NeedsRunner.class) public void testPerKeyEventTimestamp() { - TestPipeline p = TestPipeline.create(); PCollection<KV<String, String>> output = p.apply(Create.timestamped( TimestampedValue.of(KV.of("A", "foo"), new Instant(100)), @@ -114,7 +114,8 @@ public class LatestTest implements Serializable { @Test public void testPerKeyOutputCoder() { - TestPipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); + KvCoder<String, Long> inputCoder = KvCoder.of( AvroCoder.of(String.class), AvroCoder.of(Long.class)); @@ -128,7 +129,6 @@ public class LatestTest implements Serializable { @Test @Category(NeedsRunner.class) public void testPerKeyEmptyCollection() { - TestPipeline p = TestPipeline.create(); PCollection<KV<String, String>> output = p.apply(Create.<KV<String, String>>of().withCoder(KvCoder.of( StringUtf8Coder.of(), StringUtf8Coder.of()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index ac3444b..47d0b87 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertThat; import java.io.Serializable; import java.util.Set; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -48,6 +47,9 @@ import org.junit.runners.JUnit4; public class MapElementsTest implements Serializable { @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); /** @@ -79,7 +81,6 @@ public class MapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMapBasic() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) .apply(MapElements.via(new SimpleFunction<Integer, Integer>() { @@ -98,7 +99,8 @@ public class MapElementsTest implements Serializable { */ @Test public void testPolymorphicSimpleFunction() throws Exception { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -120,7 +122,8 @@ public class MapElementsTest implements Serializable { */ @Test public void testNestedPolymorphicSimpleFunction() throws Exception { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) @@ -149,7 +152,6 @@ public class MapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMapBasicSerializableFunction() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.of(1, 2, 3)) .apply(MapElements.via(new SerializableFunction<Integer, Integer>() { @@ -170,7 +172,6 @@ public class MapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testSimpleFunctionOutputTypeDescriptor() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<String> output = pipeline .apply(Create.of("hello")) .apply(MapElements.via(new SimpleFunction<String, String>() { @@ -191,7 +192,6 @@ public class MapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testVoidValues() throws Exception { - Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.of("hello")) .apply(WithKeys.<String, String>of("k")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java index 9bc8a64..2c3a735 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -43,10 +44,13 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ParDoLifecycleTest implements Serializable { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testOldFnCallSequence() { - TestPipeline p = TestPipeline.create(); PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) .and(p.apply("Polite", Create.of(3, 5, 6, 7))) .apply(Flatten.<Integer>pCollections()) @@ -58,7 +62,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(RunnableOnService.class) public void testOldFnCallSequenceMulti() { - TestPipeline p = TestPipeline.create(); PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) .and(p.apply("Polite", Create.of(3, 5, 6, 7))) .apply(Flatten.<Integer>pCollections()) @@ -127,7 +130,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFnCallSequence() { - TestPipeline p = TestPipeline.create(); PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) .and(p.apply("Polite", Create.of(3, 5, 6, 7))) .apply(Flatten.<Integer>pCollections()) @@ -139,7 +141,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFnCallSequenceMulti() { - TestPipeline p = TestPipeline.create(); PCollectionList.of(p.apply("Impolite", Create.of(1, 2, 4))) .and(p.apply("Polite", Create.of(3, 5, 6, 7))) .apply(Flatten.<Integer>pCollections()) @@ -206,7 +207,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testTeardownCalledAfterExceptionInSetup() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP); p .apply(Create.of(1, 2, 3)) @@ -227,7 +227,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testTeardownCalledAfterExceptionInStartBundle() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE); p .apply(Create.of(1, 2, 3)) @@ -246,7 +245,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testTeardownCalledAfterExceptionInProcessElement() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT); p .apply(Create.of(1, 2, 3)) @@ -265,7 +263,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testTeardownCalledAfterExceptionInFinishBundle() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE); p .apply(Create.of(1, 2, 3)) @@ -284,7 +281,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWithContextTeardownCalledAfterExceptionInSetup() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.SETUP); p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); try { @@ -300,7 +296,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWithContextTeardownCalledAfterExceptionInStartBundle() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.START_BUNDLE); p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); try { @@ -316,7 +311,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWithContextTeardownCalledAfterExceptionInProcessElement() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.PROCESS_ELEMENT); p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); try { @@ -332,7 +326,6 @@ public class ParDoLifecycleTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWithContextTeardownCalledAfterExceptionInFinishBundle() { - TestPipeline p = TestPipeline.create(); ExceptionThrowingOldFn fn = new ExceptionThrowingOldFn(MethodForException.FINISH_BUNDLE); p.apply(Create.of(1, 2, 3)).apply(ParDo.of(fn)); try { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 4a3e2dd..3a47fc7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -47,7 +47,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.VarIntCoder; @@ -107,6 +106,9 @@ public class ParDoTest implements Serializable { // anonymous inner classes inside the non-static test methods. @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + + @Rule public transient ExpectedException thrown = ExpectedException.none(); private static class PrintingDoFn extends DoFn<String, String> { @@ -302,7 +304,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDo() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -319,7 +320,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDo2() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -336,7 +336,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoEmpty() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(); @@ -354,8 +353,6 @@ public class ParDoTest implements Serializable { @Category(RunnableOnService.class) public void testParDoEmptyOutputs() { - Pipeline pipeline = TestPipeline.create(); - List<Integer> inputs = Arrays.asList(); PCollection<String> output = pipeline @@ -370,7 +367,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoWithSideOutputs() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -413,7 +409,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoEmptyWithSideOutputs() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(); @@ -454,7 +449,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoWithEmptySideOutputs() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(); @@ -482,7 +476,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoWithOnlySideOutputs() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -507,7 +500,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoWritingToUndeclaredSideOutput() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -529,7 +521,7 @@ public class ParDoTest implements Serializable { // TODO: The exception thrown is runner-specific, even if the behavior is general @Category(NeedsRunner.class) public void testParDoUndeclaredSideOutputLimit() { - Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3))); // Success for a total of 1000 outputs. @@ -566,7 +558,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoWithSideInputs() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -598,7 +589,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoWithSideInputsIsCumulative() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -632,7 +622,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMultiOutputParDoWithSideInputs() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -670,7 +659,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testMultiOutputParDoWithSideInputsIsCumulative() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -708,7 +696,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoReadingFromUnknownSideInput() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -746,7 +733,6 @@ public class ParDoTest implements Serializable { // on an input where the element is in multiple windows. The complication is // that side inputs are per-window, so the runner has to make sure // to process each window individually. - Pipeline p = TestPipeline.create(); MutableDateTime mutableNow = Instant.now().toMutableDateTime(); mutableNow.setMillisOfSecond(0); @@ -754,9 +740,9 @@ public class ParDoTest implements Serializable { SlidingWindows windowFn = SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)); - PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton()); + PCollectionView<Integer> view = pipeline.apply(Create.of(1)).apply(View.<Integer>asSingleton()); PCollection<String> res = - p.apply(Create.timestamped(TimestampedValue.of("a", now))) + pipeline.apply(Create.timestamped(TimestampedValue.of("a", now))) .apply(Window.<String>into(windowFn)) .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view)); @@ -766,14 +752,12 @@ public class ParDoTest implements Serializable { PAssert.that(res).inWindow(window).containsInAnyOrder("a:1"); } - p.run(); + pipeline.run(); } @Test @Category(NeedsRunner.class) public void testParDoWithErrorInStartBatch() { - Pipeline pipeline = TestPipeline.create(); - List<Integer> inputs = Arrays.asList(3, -42, 666); pipeline.apply(Create.of(inputs)) @@ -787,7 +771,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoWithErrorInProcessElement() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -802,7 +785,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoWithErrorInFinishBatch() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -816,23 +798,27 @@ public class ParDoTest implements Serializable { @Test public void testParDoOutputNameBasedOnDoFnWithTrimmedSuffix() { - Pipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new TestDoFn())); + pipeline.enableAbandonedNodeEnforcement(false); + + PCollection<String> output = pipeline.apply(Create.of(1)).apply(ParDo.of(new TestDoFn())); assertThat(output.getName(), containsString("ParDo(Test)")); } @Test public void testParDoOutputNameBasedOnLabel() { - Pipeline p = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<String> output = - p.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestDoFn())); + pipeline.apply(Create.of(1)).apply("MyParDo", ParDo.of(new TestDoFn())); assertThat(output.getName(), containsString("MyParDo")); } @Test public void testParDoOutputNameBasedDoFnWithoutMatchingSuffix() { - Pipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer())); + pipeline.enableAbandonedNodeEnforcement(false); + + PCollection<String> output = + pipeline.apply(Create.of(1)).apply(ParDo.of(new StrangelyNamedDoer())); assertThat(output.getName(), containsString("ParDo(StrangelyNamedDoer)")); } @@ -850,7 +836,7 @@ public class ParDoTest implements Serializable { @Test public void testParDoWithSideOutputsName() { - Pipeline p = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){}; @@ -858,7 +844,7 @@ public class ParDoTest implements Serializable { TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){}; TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){}; - PCollectionTuple outputs = p + PCollectionTuple outputs = pipeline .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput") .apply("MyParDo", ParDo .of(new TestDoFn( @@ -880,7 +866,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testParDoInCustomTransform() { - Pipeline pipeline = TestPipeline.create(); List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -904,7 +889,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMultiOutputChaining() { - Pipeline pipeline = TestPipeline.create(); PCollectionTuple filters = pipeline .apply(Create.of(Arrays.asList(3, 4, 5, 6))) @@ -1106,7 +1090,7 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testSideOutputUnknownCoder() throws Exception { - Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); @@ -1122,7 +1106,8 @@ public class ParDoTest implements Serializable { @Test public void testSideOutputUnregisteredExplicitCoder() throws Exception { - Pipeline pipeline = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<Integer> input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); @@ -1144,7 +1129,7 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMainOutputUnregisteredExplicitCoder() { - Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); @@ -1165,7 +1150,6 @@ public class ParDoTest implements Serializable { // should not cause a crash based on lack of a coder for the // side output. - Pipeline pipeline = TestPipeline.create(); final TupleTag<TestDummy> mainOutputTag = new TupleTag<TestDummy>("main"); final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("side"); PCollectionTuple tuple = pipeline @@ -1204,7 +1188,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoOutputWithTimestamp() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); @@ -1226,7 +1209,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoSideOutputWithTimestamp() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); @@ -1258,7 +1240,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoShiftTimestamp() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); @@ -1281,7 +1262,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoShiftTimestampInvalid() { - Pipeline pipeline = TestPipeline.create(); pipeline.apply(Create.of(Arrays.asList(3, 42, 6))) .apply(ParDo.of(new TestOutputTimestampDoFn())) @@ -1300,7 +1280,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testParDoShiftTimestampInvalidZeroAllowed() { - Pipeline pipeline = TestPipeline.create(); pipeline.apply(Create.of(Arrays.asList(3, 42, 6))) .apply(ParDo.of(new TestOutputTimestampDoFn())) @@ -1353,7 +1332,6 @@ public class ParDoTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowingInStartAndFinishBundle() { - Pipeline pipeline = TestPipeline.create(); PCollection<String> output = pipeline @@ -1391,7 +1369,6 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) public void testWindowingInStartBundleException() { - Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1)))) @@ -1477,13 +1454,12 @@ public class ParDoTest implements Serializable { } }; - Pipeline p = TestPipeline.create(); PCollection<Integer> output = - p.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) + pipeline.apply(Create.of(KV.of("hello", 42), KV.of("hello", 97), KV.of("hello", 84))) .apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder(0, 1, 2); - p.run(); + pipeline.run(); } @Test @@ -1514,9 +1490,8 @@ public class ParDoTest implements Serializable { } }; - Pipeline p = TestPipeline.create(); PCollectionTuple output = - p.apply( + pipeline.apply( Create.of( KV.of("hello", 42), KV.of("hello", 97), @@ -1534,7 +1509,7 @@ public class ParDoTest implements Serializable { // There are 1 and 3 from "hello" and just "1" from "goodbye" PAssert.that(odds).containsInAnyOrder(1, 3, 1); - p.run(); + pipeline.run(); } @Test @@ -1562,24 +1537,23 @@ public class ParDoTest implements Serializable { } }; - Pipeline p = TestPipeline.create(); PCollection<List<Integer>> output = - p.apply( + pipeline.apply( Create.of( KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12))) .apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder(Lists.newArrayList(12, 42, 84, 97)); - p.run(); + pipeline.run(); } @Test @Category({RunnableOnService.class, UsesStatefulParDo.class}) public void testBagStateSideInput() { - Pipeline p = TestPipeline.create(); final PCollectionView<List<Integer>> listView = - p.apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList()); + pipeline + .apply("Create list for side input", Create.of(2, 1, 0)).apply(View.<Integer>asList()); final String stateId = "foo"; DoFn<KV<String, Integer>, List<Integer>> fn = @@ -1607,7 +1581,7 @@ public class ParDoTest implements Serializable { }; PCollection<List<Integer>> output = - p.apply( + pipeline.apply( "Create main input", Create.of( KV.of("hello", 97), KV.of("hello", 42), KV.of("hello", 84), KV.of("hello", 12))) @@ -1616,7 +1590,7 @@ public class ParDoTest implements Serializable { PAssert.that(output).containsInAnyOrder( Lists.newArrayList(12, 42, 84, 97), Lists.newArrayList(0, 1, 2)); - p.run(); + pipeline.run(); } /** @@ -1658,11 +1632,9 @@ public class ParDoTest implements Serializable { } }; - Pipeline p = TestPipeline.create(); - - PCollection<Integer> output = p.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); + PCollection<Integer> output = pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn)); PAssert.that(output).containsInAnyOrder(3, 42); - p.run(); + pipeline.run(); } @Test @@ -1704,7 +1676,6 @@ public class ParDoTest implements Serializable { @Test public void testRejectsWrongWindowType() { - Pipeline p = TestPipeline.create(); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(GlobalWindow.class.getSimpleName()); @@ -1712,7 +1683,8 @@ public class ParDoTest implements Serializable { thrown.expectMessage("window type"); thrown.expectMessage("not a supertype"); - p.apply(Create.of(1, 2, 3)) + pipeline + .apply(Create.of(1, 2, 3)) .apply( ParDo.of( new DoFn<Integer, Integer>() { @@ -1735,9 +1707,8 @@ public class ParDoTest implements Serializable { public void testMultipleWindowSubtypesOK() { final String timerId = "gobbledegook"; - Pipeline p = TestPipeline.create(); - - p.apply(Create.of(1, 2, 3)) + pipeline + .apply(Create.of(1, 2, 3)) .apply(Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(10)))) .apply( ParDo.of( @@ -1759,26 +1730,25 @@ public class ParDoTest implements Serializable { public void testRejectsSplittableDoFnByDefault() { // ParDo with a splittable DoFn must be overridden by the runner. // Without an override, applying it directly must fail. - Pipeline p = TestPipeline.create(); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(p.getRunner().getClass().getName()); + thrown.expectMessage(pipeline.getRunner().getClass().getName()); thrown.expectMessage("does not support Splittable DoFn"); - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn())); + pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn())); } @Test public void testMultiRejectsSplittableDoFnByDefault() { // ParDo with a splittable DoFn must be overridden by the runner. // Without an override, applying it directly must fail. - Pipeline p = TestPipeline.create(); thrown.expect(IllegalArgumentException.class); - thrown.expectMessage(p.getRunner().getClass().getName()); + thrown.expectMessage(pipeline.getRunner().getClass().getName()); thrown.expectMessage("does not support Splittable DoFn"); - p.apply(Create.of(1, 2, 3)) + pipeline + .apply(Create.of(1, 2, 3)) .apply( ParDo.of(new TestSplittableDoFn()) .withOutputTags( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java index 1cbe344..87d7460 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -47,6 +46,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class PartitionTest implements Serializable { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); static class ModFn implements PartitionFn<Integer> { @@ -63,10 +63,10 @@ public class PartitionTest implements Serializable { } } + @Test @Category(RunnableOnService.class) public void testEvenOddPartition() { - Pipeline pipeline = TestPipeline.create(); PCollectionList<Integer> outputs = pipeline .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) @@ -81,7 +81,6 @@ public class PartitionTest implements Serializable { @Test @Category(NeedsRunner.class) public void testModPartition() { - Pipeline pipeline = TestPipeline.create(); PCollectionList<Integer> outputs = pipeline .apply(Create.of(1, 2, 4, 5)) @@ -96,7 +95,6 @@ public class PartitionTest implements Serializable { @Test @Category(NeedsRunner.class) public void testOutOfBoundsPartitions() { - Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.of(-1)) @@ -110,7 +108,6 @@ public class PartitionTest implements Serializable { @Test public void testZeroNumPartitions() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> input = pipeline.apply(Create.of(591)); @@ -122,7 +119,6 @@ public class PartitionTest implements Serializable { @Test @Category(NeedsRunner.class) public void testDroppedPartition() { - Pipeline pipeline = TestPipeline.create(); // Compute the set of integers either 1 or 2 mod 3, the hard way. PCollectionList<Integer> outputs = pipeline http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java index 6e196b4..cd707da 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/RegexTest.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -33,11 +34,13 @@ import org.junit.runners.JUnit4; /** Tests for {@link Regex}. */ @RunWith(JUnit4.class) public class RegexTest implements Serializable { + + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test @Category(NeedsRunner.class) public void testFind() { - TestPipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of("aj", "xj", "yj", "zj")).apply(Regex.find("[xyz]")); @@ -48,8 +51,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testFindGroup() { - TestPipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of("aj", "xj", "yj", "zj")).apply(Regex.find("([xyz])", 1)); @@ -60,8 +61,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testFindNone() { - TestPipeline p = TestPipeline.create(); - PCollection<String> output = p.apply(Create.of("a", "b", "c", "d")).apply(Regex.find("[xyz]")); PAssert.that(output).empty(); @@ -71,7 +70,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testKVFind() { - TestPipeline p = TestPipeline.create(); PCollection<KV<String, String>> output = p.apply(Create.of("a b c")).apply(Regex.findKV("a (b) (c)", 1, 2)); @@ -83,7 +81,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testKVFindNone() { - TestPipeline p = TestPipeline.create(); PCollection<KV<String, String>> output = p.apply(Create.of("x y z")).apply(Regex.findKV("a (b) (c)", 1, 2)); @@ -95,7 +92,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMatches() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("a", "x", "y", "z")).apply(Regex.matches("[xyz]")); @@ -107,7 +103,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMatchesNone() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("a", "b", "c", "d")).apply(Regex.matches("[xyz]")); @@ -119,7 +114,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMatchesGroup() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("a", "x xxx", "x yyy", "x zzz")).apply(Regex.matches("x ([xyz]*)", 1)); @@ -131,7 +125,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testKVMatches() { - TestPipeline p = TestPipeline.create(); PCollection<KV<String, String>> output = p.apply(Create.of("a b c")).apply(Regex.matchesKV("a (b) (c)", 1, 2)); @@ -143,7 +136,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testKVMatchesNone() { - TestPipeline p = TestPipeline.create(); PCollection<KV<String, String>> output = p.apply(Create.of("x y z")).apply(Regex.matchesKV("a (b) (c)", 1, 2)); @@ -154,7 +146,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReplaceAll() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("xj", "yj", "zj")).apply(Regex.replaceAll("[xyz]", "new")); @@ -166,7 +157,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReplaceAllMixed() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("abc", "xj", "yj", "zj", "def")).apply(Regex.replaceAll("[xyz]", "new")); @@ -178,7 +168,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReplaceFirst() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("xjx", "yjy", "zjz")).apply(Regex.replaceFirst("[xyz]", "new")); @@ -190,7 +179,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testReplaceFirstMixed() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("abc", "xjx", "yjy", "zjz", "def")) @@ -203,7 +191,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testSplits() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("The quick brown fox jumps over the lazy dog")) @@ -217,7 +204,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testSplitsWithEmpty() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("The quick brown fox jumps over the lazy dog")) @@ -235,7 +221,6 @@ public class RegexTest implements Serializable { @Test @Category(NeedsRunner.class) public void testSplitsWithoutEmpty() { - TestPipeline p = TestPipeline.create(); PCollection<String> output = p.apply(Create.of("The quick brown fox jumps over the lazy dog")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java index a0555fa..9cc12d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java @@ -32,6 +32,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; + import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -41,6 +42,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -102,71 +104,70 @@ public class SampleTest { } } + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testSample() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.apply(Create.of(DATA) + PCollection<Integer> input = pipeline.apply(Create.of(DATA) .withCoder(BigEndianIntegerCoder.of())); PCollection<Iterable<Integer>> output = input.apply( Sample.<Integer>fixedSizeGlobally(3)); PAssert.thatSingletonIterable(output) .satisfies(new VerifyCorrectSample<>(3, DATA)); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testSampleEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.apply(Create.of(EMPTY) + PCollection<Integer> input = pipeline.apply(Create.of(EMPTY) .withCoder(BigEndianIntegerCoder.of())); PCollection<Iterable<Integer>> output = input.apply( Sample.<Integer>fixedSizeGlobally(3)); PAssert.thatSingletonIterable(output) .satisfies(new VerifyCorrectSample<>(0, EMPTY)); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testSampleZero() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.apply(Create.of(DATA) + PCollection<Integer> input = pipeline.apply(Create.of(DATA) .withCoder(BigEndianIntegerCoder.of())); PCollection<Iterable<Integer>> output = input.apply( Sample.<Integer>fixedSizeGlobally(0)); PAssert.thatSingletonIterable(output) .satisfies(new VerifyCorrectSample<>(0, DATA)); - p.run(); + pipeline.run(); } @Test @Category(RunnableOnService.class) public void testSampleInsufficientElements() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.apply(Create.of(DATA) + PCollection<Integer> input = pipeline.apply(Create.of(DATA) .withCoder(BigEndianIntegerCoder.of())); PCollection<Iterable<Integer>> output = input.apply( Sample.<Integer>fixedSizeGlobally(10)); PAssert.thatSingletonIterable(output) .satisfies(new VerifyCorrectSample<>(5, DATA)); - p.run(); + pipeline.run(); } @Test(expected = IllegalArgumentException.class) public void testSampleNegative() { - Pipeline p = TestPipeline.create(); + pipeline.enableAbandonedNodeEnforcement(false); - PCollection<Integer> input = p.apply(Create.of(DATA) + PCollection<Integer> input = pipeline.apply(Create.of(DATA) .withCoder(BigEndianIntegerCoder.of())); input.apply(Sample.<Integer>fixedSizeGlobally(-1)); } @@ -174,9 +175,8 @@ public class SampleTest { @Test @Category(RunnableOnService.class) public void testSampleMultiplicity() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.apply(Create.of(REPEATED_DATA) + PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA) .withCoder(BigEndianIntegerCoder.of())); // At least one value must be selected with multiplicity. PCollection<Iterable<Integer>> output = input.apply( @@ -184,7 +184,7 @@ public class SampleTest { PAssert.thatSingletonIterable(output) .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA)); - p.run(); + pipeline.run(); } private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 022c2e5..e3b58b7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -29,7 +29,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -54,6 +53,7 @@ import org.apache.beam.sdk.values.TupleTagList; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.MutableDateTime; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -151,10 +151,13 @@ public class SplittableDoFnTest { } } + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testPairWithIndexBasic() { - Pipeline p = TestPipeline.create(); + PCollection<KV<String, Integer>> res = p.apply(Create.of("a", "bb", "ccccc")) .apply(ParDo.of(new PairStringWithIndexToLength())) @@ -180,7 +183,6 @@ public class SplittableDoFnTest { public void testPairWithIndexWindowedTimestamped() { // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps // of elements in the input collection. - Pipeline p = TestPipeline.create(); MutableDateTime mutableNow = Instant.now().toMutableDateTime(); mutableNow.setMillisOfSecond(0); @@ -277,7 +279,6 @@ public class SplittableDoFnTest { @Test @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testOutputAfterCheckpoint() throws Exception { - Pipeline p = TestPipeline.create(); PCollection<Integer> outputs = p.apply(Create.of("foo")) .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock())); PAssert.thatSingleton(outputs.apply(Count.<Integer>globally())) @@ -317,7 +318,6 @@ public class SplittableDoFnTest { @Test @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testSideInputsAndOutputs() throws Exception { - Pipeline p = TestPipeline.create(); PCollectionView<String> sideInput = p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton()); @@ -344,7 +344,6 @@ public class SplittableDoFnTest { @Test @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testLateData() throws Exception { - Pipeline p = TestPipeline.create(); Instant base = Instant.now(); @@ -439,7 +438,6 @@ public class SplittableDoFnTest { @Test @Category({RunnableOnService.class, UsesSplittableParDo.class}) public void testLifecycleMethods() throws Exception { - Pipeline p = TestPipeline.create(); PCollection<String> res = p.apply(Create.of("a", "b", "c")).apply(ParDo.of(new SDFWithLifecycle())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java index b624252..d011197 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java @@ -53,6 +53,9 @@ import org.junit.runners.JUnit4; public class TopTest { @Rule + public final TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException expectedEx = ExpectedException.none(); @SuppressWarnings("unchecked") @@ -93,7 +96,6 @@ public class TopTest { @Category(NeedsRunner.class) @SuppressWarnings("unchecked") public void testTop() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(COLLECTION)) .withCoder(StringUtf8Coder.of())); @@ -125,7 +127,6 @@ public class TopTest { @Category(NeedsRunner.class) @SuppressWarnings("unchecked") public void testTopEmpty() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(EMPTY_COLLECTION)) .withCoder(StringUtf8Coder.of())); @@ -151,7 +152,8 @@ public class TopTest { @Test public void testTopEmptyWithIncompatibleWindows() { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); + Bound<String> windowingFn = Window.<String>into(FixedWindows.of(Duration.standardDays(10L))); PCollection<String> input = p.apply(Create.timestamped(Collections.<String>emptyList(), Collections.<Long>emptyList())) @@ -170,7 +172,6 @@ public class TopTest { @Category(NeedsRunner.class) @SuppressWarnings("unchecked") public void testTopZero() { - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(COLLECTION)) .withCoder(StringUtf8Coder.of())); @@ -202,7 +203,8 @@ public class TopTest { // This is a purely compile-time test. If the code compiles, then it worked. @Test public void testPerKeySerializabilityRequirement() { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); + p.apply("CreateCollection", Create.of(Arrays.asList(COLLECTION)) .withCoder(StringUtf8Coder.of())); @@ -218,7 +220,8 @@ public class TopTest { @Test public void testCountConstraint() { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); + PCollection<String> input = p.apply(Create.of(Arrays.asList(COLLECTION)) .withCoder(StringUtf8Coder.of())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java index 0bf2e2e..5e27552 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ValuesTest.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms; import static org.junit.Assert.assertEquals; import java.util.Arrays; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -29,6 +28,7 @@ import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -52,10 +52,12 @@ public class ValuesTest { static final KV<String, Integer>[] EMPTY_TABLE = new KV[] { }; + @Rule + public final TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testValues() { - Pipeline p = TestPipeline.create(); PCollection<KV<String, Integer>> input = p.apply(Create.of(Arrays.asList(TABLE)).withCoder( @@ -72,7 +74,6 @@ public class ValuesTest { @Test @Category(RunnableOnService.class) public void testValuesEmpty() { - Pipeline p = TestPipeline.create(); PCollection<KV<String, Integer>> input = p.apply(Create.of(Arrays.asList(EMPTY_TABLE)).withCoder(