Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule. Plus, fixed some checkstyle errors from previous modules' migration.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75a4c918 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75a4c918 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75a4c918 Branch: refs/heads/python-sdk Commit: 75a4c918346b5a04213a54bf7d1bf6507655342a Parents: 09c404a Author: Stas Levin <stasle...@gmail.com> Authored: Mon Dec 19 23:54:47 2016 +0200 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Dec 20 09:55:45 2016 -0800 ---------------------------------------------------------------------- .../UnboundedReadFromBoundedSourceTest.java | 1 - .../direct/CloningBundleFactoryTest.java | 2 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 6 +- .../ImmutabilityCheckingBundleFactoryTest.java | 2 +- .../direct/ImmutableListBundleFactoryTest.java | 2 +- .../direct/WriteWithShardingFactoryTest.java | 2 +- .../java/org/apache/beam/sdk/PipelineTest.java | 37 +++--- .../apache/beam/sdk/coders/AvroCoderTest.java | 11 +- .../beam/sdk/coders/CoderRegistryTest.java | 6 +- .../beam/sdk/coders/SerializableCoderTest.java | 7 +- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 16 +-- .../io/BoundedReadFromUnboundedSourceTest.java | 6 +- .../beam/sdk/io/CompressedSourceTest.java | 12 +- .../apache/beam/sdk/io/CountingInputTest.java | 12 +- .../apache/beam/sdk/io/CountingSourceTest.java | 13 +- .../apache/beam/sdk/io/FileBasedSourceTest.java | 4 +- .../beam/sdk/io/PubsubUnboundedSinkTest.java | 10 +- .../beam/sdk/io/PubsubUnboundedSourceTest.java | 12 +- .../java/org/apache/beam/sdk/io/TextIOTest.java | 29 +++-- .../java/org/apache/beam/sdk/io/WriteTest.java | 2 +- .../org/apache/beam/sdk/io/XmlSourceTest.java | 10 +- .../sdk/options/ProxyInvocationHandlerTest.java | 5 +- .../sdk/runners/TransformHierarchyTest.java | 6 +- .../beam/sdk/runners/TransformTreeTest.java | 11 +- .../beam/sdk/testing/GatherAllPanesTest.java | 7 +- .../apache/beam/sdk/testing/PAssertTest.java | 32 ++--- .../apache/beam/sdk/testing/TestStreamTest.java | 7 +- .../transforms/ApproximateQuantilesTest.java | 12 +- .../sdk/transforms/ApproximateUniqueTest.java | 6 +- .../beam/sdk/transforms/CombineFnsTest.java | 5 +- .../apache/beam/sdk/transforms/CombineTest.java | 25 +--- .../apache/beam/sdk/transforms/CountTest.java | 13 +- .../apache/beam/sdk/transforms/CreateTest.java | 27 +---- .../beam/sdk/transforms/DistinctTest.java | 12 +- .../apache/beam/sdk/transforms/DoFnTest.java | 4 +- .../beam/sdk/transforms/DoFnTesterTest.java | 6 +- .../apache/beam/sdk/transforms/FilterTest.java | 18 +-- .../sdk/transforms/FlatMapElementsTest.java | 10 +- .../apache/beam/sdk/transforms/FlattenTest.java | 35 +----- .../beam/sdk/transforms/GroupByKeyTest.java | 30 ++--- .../apache/beam/sdk/transforms/KeysTest.java | 9 +- .../apache/beam/sdk/transforms/KvSwapTest.java | 9 +- .../apache/beam/sdk/transforms/LatestTest.java | 12 +- .../beam/sdk/transforms/MapElementsTest.java | 14 +-- .../beam/sdk/transforms/ParDoLifecycleTest.java | 17 +-- .../apache/beam/sdk/transforms/ParDoTest.java | 118 +++++++------------ .../beam/sdk/transforms/PartitionTest.java | 8 +- .../apache/beam/sdk/transforms/RegexTest.java | 25 +--- .../apache/beam/sdk/transforms/SampleTest.java | 34 +++--- .../beam/sdk/transforms/SplittableDoFnTest.java | 12 +- .../org/apache/beam/sdk/transforms/TopTest.java | 15 ++- .../apache/beam/sdk/transforms/ValuesTest.java | 7 +- .../apache/beam/sdk/transforms/ViewTest.java | 84 ++++--------- .../beam/sdk/transforms/WithKeysTest.java | 8 +- .../beam/sdk/transforms/WithTimestampsTest.java | 9 +- .../sdk/transforms/join/CoGroupByKeyTest.java | 11 +- .../sdk/transforms/windowing/WindowTest.java | 22 ++-- .../sdk/transforms/windowing/WindowingTest.java | 11 +- .../org/apache/beam/sdk/util/ReshuffleTest.java | 11 +- .../beam/sdk/values/PCollectionTupleTest.java | 12 +- .../org/apache/beam/sdk/values/PDoneTest.java | 9 +- .../apache/beam/sdk/values/TypedPValueTest.java | 10 +- 62 files changed, 353 insertions(+), 587 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index 86450f2..0f09cd1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -36,7 +36,6 @@ import java.util.Random; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java index e5299a2..505d3a2 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java @@ -62,7 +62,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class CloningBundleFactoryTest { @Rule public ExpectedException thrown = ExpectedException.none(); - @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); private CloningBundleFactory factory = CloningBundleFactory.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 35245f4..12ef66c 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -62,14 +62,10 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class CopyOnAccessInMemoryStateInternalsTest { - @Rule public TestPipeline pipeline = TestPipeline.create(); + @Rule public final TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); private String key = "foo"; - public CopyOnAccessInMemoryStateInternalsTest() { - pipeline = TestPipeline.create(); - } - @Test public void testGetWithEmpty() { CopyOnAccessInMemoryStateInternals<String> internals = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java index 2448078..eccb3a6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java @@ -47,7 +47,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class ImmutabilityCheckingBundleFactoryTest { - @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Rule public ExpectedException thrown = ExpectedException.none(); private ImmutabilityCheckingBundleFactory factory; private PCollection<byte[]> created; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java index 46f02cd..3327ccd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java @@ -57,7 +57,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ImmutableListBundleFactoryTest { - @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Rule public ExpectedException thrown = ExpectedException.none(); private ImmutableListBundleFactory bundleFactory = ImmutableListBundleFactory.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index a8c4c02..7432e61 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -69,7 +69,7 @@ public class WriteWithShardingFactoryTest { public static final int INPUT_SIZE = 10000; @Rule public TemporaryFolder tmp = new TemporaryFolder(); private WriteWithShardingFactory<Object> factory = new WriteWithShardingFactory<>(); - @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Rule public final TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Test public void dynamicallyReshardedWrite() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index fea1554..d8e4ef4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -62,6 +62,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class PipelineTest { + @Rule public final TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedLogs logged = ExpectedLogs.none(Pipeline.class); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -128,8 +129,7 @@ public class PipelineTest { PTransform<PCollection<? extends String>, PCollection<String>> myTransform = addSuffix("+"); - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.<String>of(ImmutableList.of("a", "b"))); + PCollection<String> input = pipeline.apply(Create.<String>of(ImmutableList.of("a", "b"))); PCollection<String> left = input.apply("Left1", myTransform).apply("Left2", myTransform); PCollection<String> right = input.apply("Right", myTransform); @@ -139,7 +139,7 @@ public class PipelineTest { PAssert.that(both).containsInAnyOrder("a++", "b++", "a+", "b+"); - p.run(); + pipeline.run(); } private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix( @@ -162,35 +162,36 @@ public class PipelineTest { @Test public void testStableUniqueNameOff() { - Pipeline p = TestPipeline.create(); - p.getOptions().setStableUniqueNames(CheckEnabled.OFF); + pipeline.enableAbandonedNodeEnforcement(false); + + pipeline.getOptions().setStableUniqueNames(CheckEnabled.OFF); - p.apply(Create.of(5, 6, 7)); - p.apply(Create.of(5, 6, 7)); + pipeline.apply(Create.of(5, 6, 7)); + pipeline.apply(Create.of(5, 6, 7)); logged.verifyNotLogged("does not have a stable unique name."); } @Test public void testStableUniqueNameWarning() { - Pipeline p = TestPipeline.create(); - p.getOptions().setStableUniqueNames(CheckEnabled.WARNING); + pipeline.enableAbandonedNodeEnforcement(false); - p.apply(Create.of(5, 6, 7)); - p.apply(Create.of(5, 6, 7)); + pipeline.getOptions().setStableUniqueNames(CheckEnabled.WARNING); + + pipeline.apply(Create.of(5, 6, 7)); + pipeline.apply(Create.of(5, 6, 7)); logged.verifyWarn("does not have a stable unique name."); } @Test public void testStableUniqueNameError() { - Pipeline p = TestPipeline.create(); - p.getOptions().setStableUniqueNames(CheckEnabled.ERROR); + pipeline.getOptions().setStableUniqueNames(CheckEnabled.ERROR); - p.apply(Create.of(5, 6, 7)); + pipeline.apply(Create.of(5, 6, 7)); thrown.expectMessage("does not have a stable unique name."); - p.apply(Create.of(5, 6, 7)); + pipeline.apply(Create.of(5, 6, 7)); } /** @@ -199,7 +200,6 @@ public class PipelineTest { @Test @Category(RunnableOnService.class) public void testIdentityTransform() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> output = pipeline .apply(Create.<Integer>of(1, 2, 3, 4)) @@ -223,8 +223,6 @@ public class PipelineTest { @Test @Category(RunnableOnService.class) public void testTupleProjectionTransform() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> input = pipeline .apply(Create.<Integer>of(1, 2, 3, 4)); @@ -258,8 +256,6 @@ public class PipelineTest { @Test @Category(RunnableOnService.class) public void testTupleInjectionTransform() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> input = pipeline .apply(Create.<Integer>of(1, 2, 3, 4)); @@ -292,7 +288,6 @@ public class PipelineTest { @Test @Category(NeedsRunner.class) public void testEmptyPipeline() throws Exception { - Pipeline pipeline = TestPipeline.create(); pipeline.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index adfa0d2..60dc07a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -55,7 +55,6 @@ import org.apache.avro.reflect.Stringable; import org.apache.avro.reflect.Union; import org.apache.avro.specific.SpecificData; import org.apache.avro.util.Utf8; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.testing.CoderProperties; @@ -73,6 +72,7 @@ import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -143,6 +143,9 @@ public class AvroCoderTest { } } + @Rule + public TestPipeline pipeline = TestPipeline.create(); + @Test public void testAvroCoderEncoding() throws Exception { AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class); @@ -287,17 +290,15 @@ public class AvroCoderTest { @Test @Category(NeedsRunner.class) public void testDefaultCoder() throws Exception { - Pipeline p = TestPipeline.create(); - // Use MyRecord as input and output types without explicitly specifying // a coder (this uses the default coders, which may not be AvroCoder). PCollection<String> output = - p.apply(Create.of(new Pojo("hello", 1), new Pojo("world", 2))) + pipeline.apply(Create.of(new Pojo("hello", 1), new Pojo("world", 2))) .apply(ParDo.of(new GetTextFn())); PAssert.that(output) .containsInAnyOrder("hello", "world"); - p.run(); + pipeline.run(); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java index d7badab..8c0e584 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java @@ -35,7 +35,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException; import org.apache.beam.sdk.coders.protobuf.ProtoCoder; import org.apache.beam.sdk.testing.NeedsRunner; @@ -63,6 +62,9 @@ import org.junit.runners.JUnit4; public class CoderRegistryTest { @Rule + public TestPipeline pipeline = TestPipeline.create(); + + @Rule public ExpectedException thrown = ExpectedException.none(); public static CoderRegistry getStandardRegistry() { @@ -414,7 +416,6 @@ public class CoderRegistryTest { @Test @Category(NeedsRunner.class) public void testSpecializedButIgnoredGenericInPipeline() throws Exception { - Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.of("hello", "goodbye")) @@ -443,7 +444,6 @@ public class CoderRegistryTest { @Test @Category(NeedsRunner.class) public void testIgnoredGenericInPipeline() throws Exception { - Pipeline pipeline = TestPipeline.create(); pipeline .apply(Create.of("hello", "goodbye")) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java index 8d344de..296ddc9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -40,6 +39,7 @@ import org.apache.beam.sdk.util.Serializer; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -98,6 +98,9 @@ public class SerializableCoderTest implements Serializable { "To be,", "or not to be"); + @Rule + public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + @Test public void testSerializableCoder() throws Exception { IterableCoder<MyRecord> coder = IterableCoder @@ -136,7 +139,7 @@ public class SerializableCoderTest implements Serializable { @Test @Category(NeedsRunner.class) public void testDefaultCoder() throws Exception { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(true); // Use MyRecord as input and output types without explicitly specifying // a coder (this uses the default coders, which may not be http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 41a630f..b669968 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -74,6 +74,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class AvroIOTest { + + @Rule + public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -135,7 +139,6 @@ public class AvroIOTest { @Test @Category(NeedsRunner.class) public void testAvroIOWriteAndReadASingleFile() throws Throwable { - TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -146,7 +149,6 @@ public class AvroIOTest { .withSchema(GenericClass.class)); p.run(); - p = TestPipeline.create(); PCollection<GenericClass> input = p .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class)); @@ -158,7 +160,6 @@ public class AvroIOTest { @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable { - TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -170,7 +171,6 @@ public class AvroIOTest { .withSchema(GenericClass.class)); p.run(); - p = TestPipeline.create(); PCollection<GenericClass> input = p .apply(AvroIO.Read .from(outputFile.getAbsolutePath()) @@ -187,7 +187,6 @@ public class AvroIOTest { @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable { - TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -199,7 +198,6 @@ public class AvroIOTest { .withCodec(CodecFactory.nullCodec())); p.run(); - p = TestPipeline.create(); PCollection<GenericClass> input = p .apply(AvroIO.Read .from(outputFile.getAbsolutePath()) @@ -257,7 +255,6 @@ public class AvroIOTest { @Test @Category(NeedsRunner.class) public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable { - TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -270,7 +267,7 @@ public class AvroIOTest { List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, "hi", null), new GenericClassV2(5, "bar", null)); - p = TestPipeline.create(); + PCollection<GenericClassV2> input = p .apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class)); @@ -321,7 +318,6 @@ public class AvroIOTest { @SuppressWarnings("unchecked") @Category(NeedsRunner.class) public void testMetdata() throws Exception { - TestPipeline p = TestPipeline.create(); List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"), new GenericClass(5, "bar")); File outputFile = tmpFolder.newFile("output.avro"); @@ -348,7 +344,7 @@ public class AvroIOTest { private void runTestWrite(String[] expectedElements, int numShards) throws IOException { File baseOutputFile = new File(tmpFolder.getRoot(), "prefix"); String outputFilePrefix = baseOutputFile.getAbsolutePath(); - TestPipeline p = TestPipeline.create(); + Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class); if (numShards > 1) { write = write.withNumShards(numShards); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java index 4d7814c..d49873e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -26,7 +26,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.dataflow.TestCountingSource; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -46,6 +46,9 @@ import org.junit.runners.JUnit4; public class BoundedReadFromUnboundedSourceTest implements Serializable{ private static final int NUM_RECORDS = 100; + @Rule + public transient TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testNoDedup() throws Exception { @@ -112,7 +115,6 @@ public class BoundedReadFromUnboundedSourceTest implements Serializable{ } private void test(boolean dedup, boolean timeBound) throws Exception { - Pipeline p = TestPipeline.create(); TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); if (dedup) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index f8769ea..3871159 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -47,7 +47,6 @@ import java.util.NoSuchElementException; import java.util.Random; import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; @@ -80,6 +79,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class CompressedSourceTest { + + @Rule + public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -199,8 +202,6 @@ public class CompressedSourceTest { os.write(totalGz); } - Pipeline p = TestPipeline.create(); - CompressedSource<Byte> source = CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) .withDecompression(CompressionMode.GZIP); @@ -274,8 +275,6 @@ public class CompressedSourceTest { String filePattern = new File(tmpFolder.getRoot().toString(), baseName + ".*").toString(); - Pipeline p = TestPipeline.create(); - CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filePattern, 1)); PCollection<Byte> output = p.apply(Read.from(source)); @@ -395,8 +394,6 @@ public class CompressedSourceTest { expected.addAll(Bytes.asList(generated)); } - Pipeline p = TestPipeline.create(); - CompressedSource<Byte> source = CompressedSource.from(new ByteSource(filePattern, 1)) .withDecompression(CompressionMode.GZIP); @@ -476,7 +473,6 @@ public class CompressedSourceTest { private void verifyReadContents(byte[] expected, File inputFile, @Nullable DecompressingChannelFactory decompressionFactory) { - Pipeline p = TestPipeline.create(); CompressedSource<Byte> source = CompressedSource.from(new ByteSource(inputFile.toPath().toString(), 1)); if (decompressionFactory != null) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index dfc4919..f23ee76 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -21,7 +21,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -39,6 +38,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -66,10 +66,12 @@ public class CountingInputTest { .isEqualTo(end - 1); } + @Rule + public TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testBoundedInput() { - Pipeline p = TestPipeline.create(); long numElements = 1000; PCollection<Long> input = p.apply(CountingInput.upTo(numElements)); @@ -80,7 +82,6 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) public void testEmptyBoundedInput() { - Pipeline p = TestPipeline.create(); PCollection<Long> input = p.apply(CountingInput.upTo(0)); PAssert.that(input).empty(); @@ -90,7 +91,6 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) public void testEmptyBoundedInputSubrange() { - Pipeline p = TestPipeline.create(); PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42)); PAssert.that(input).empty(); @@ -101,7 +101,6 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) public void testBoundedInputSubrange() { - Pipeline p = TestPipeline.create(); long start = 10; long end = 1000; PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end)); @@ -128,7 +127,6 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) public void testUnboundedInput() { - Pipeline p = TestPipeline.create(); long numElements = 1000; PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements)); @@ -140,7 +138,6 @@ public class CountingInputTest { @Test @Category(NeedsRunner.class) public void testUnboundedInputRate() { - Pipeline p = TestPipeline.create(); long numElements = 5000; long elemsPerPeriod = 10L; @@ -169,7 +166,6 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) public void testUnboundedInputTimestamps() { - Pipeline p = TestPipeline.create(); long numElements = 1000; PCollection<Long> input = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 5eccde6..dfd0949 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; import org.apache.beam.sdk.io.CountingSource.CounterMark; import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource; @@ -48,6 +47,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -79,10 +79,12 @@ public class CountingSourceTest { .isEqualTo(numElements - 1); } + @Rule + public TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) public void testBoundedSource() { - Pipeline p = TestPipeline.create(); long numElements = 1000; PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(numElements))); @@ -93,7 +95,6 @@ public class CountingSourceTest { @Test @Category(RunnableOnService.class) public void testEmptyBoundedSource() { - Pipeline p = TestPipeline.create(); PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(0))); PAssert.that(input).empty(); @@ -103,7 +104,6 @@ public class CountingSourceTest { @Test @Category(RunnableOnService.class) public void testBoundedSourceSplits() throws Exception { - Pipeline p = TestPipeline.create(); long numElements = 1000; long numSplits = 10; long splitSizeBytes = numElements * 8 / numSplits; // 8 bytes per long element. @@ -157,7 +157,6 @@ public class CountingSourceTest { @Test @Category(RunnableOnService.class) public void testUnboundedSource() { - Pipeline p = TestPipeline.create(); long numElements = 1000; PCollection<Long> input = p @@ -177,7 +176,6 @@ public class CountingSourceTest { @Test @Category(RunnableOnService.class) public void testUnboundedSourceTimestamps() { - Pipeline p = TestPipeline.create(); long numElements = 1000; PCollection<Long> input = p.apply( @@ -197,7 +195,6 @@ public class CountingSourceTest { @Test @Category(NeedsRunner.class) public void testUnboundedSourceWithRate() { - Pipeline p = TestPipeline.create(); Duration period = Duration.millis(5); long numElements = 1000L; @@ -232,7 +229,6 @@ public class CountingSourceTest { @Test @Category(RunnableOnService.class) public void testUnboundedSourceSplits() throws Exception { - Pipeline p = TestPipeline.create(); long numElements = 1000; int numSplits = 10; @@ -257,7 +253,6 @@ public class CountingSourceTest { @Test @Category(NeedsRunner.class) public void testUnboundedSourceRateSplits() throws Exception { - Pipeline p = TestPipeline.create(); int elementsPerPeriod = 10; Duration period = Duration.millis(5); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index f4b8574..f709e22 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -42,7 +42,6 @@ import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader; @@ -73,6 +72,7 @@ public class FileBasedSourceTest { Random random = new Random(0L); + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -719,7 +719,6 @@ public class FileBasedSourceTest { @Test @Category(NeedsRunner.class) public void testDataflowFile() throws IOException { - Pipeline p = TestPipeline.create(); List<String> data = createStringDataset(3, 50); String fileName = "file"; @@ -735,7 +734,6 @@ public class FileBasedSourceTest { @Test @Category(NeedsRunner.class) public void testDataflowFilePattern() throws IOException { - Pipeline p = TestPipeline.create(); List<String> data1 = createStringDataset(3, 50); File file1 = createFileWithData("file1", data1); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java index 518136f..5bc1664 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.util.PubsubTestClient; import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -67,6 +68,9 @@ public class PubsubUnboundedSinkTest { return Hashing.murmur3_128().hashBytes(data.getBytes()).toString(); } + @Rule + public TestPipeline p = TestPipeline.create(); + @Test public void saneCoder() throws Exception { OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP, getRecordId(DATA)); @@ -88,7 +92,7 @@ public class PubsubUnboundedSinkTest { new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); - TestPipeline p = TestPipeline.create(); + p.apply(Create.of(ImmutableList.of(DATA))) .apply(ParDo.of(new Stamp())) .apply(sink); @@ -117,7 +121,7 @@ public class PubsubUnboundedSinkTest { new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); - TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) .apply(sink); @@ -153,7 +157,7 @@ public class PubsubUnboundedSinkTest { StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC); - TestPipeline p = TestPipeline.create(); + p.apply(Create.of(data)) .apply(ParDo.of(new Stamp())) .apply(sink); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java index f6165c5..601e2c8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java @@ -57,6 +57,7 @@ import org.apache.beam.sdk.util.PubsubTestClient; import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory; import org.joda.time.Instant; import org.junit.After; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -82,6 +83,9 @@ public class PubsubUnboundedSourceTest { private PubsubTestClientFactory factory; private PubsubSource<String> primSource; + @Rule + public TestPipeline p = TestPipeline.create(); + private void setupOneMessage(Iterable<IncomingMessage> incoming) { now = new AtomicLong(REQ_TIME); clock = new Clock() { @@ -124,7 +128,6 @@ public class PubsubUnboundedSourceTest { @Test public void readOneMessage() throws IOException { setupOneMessage(); - TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); // Read one message. assertTrue(reader.start()); @@ -139,7 +142,6 @@ public class PubsubUnboundedSourceTest { @Test public void timeoutAckAndRereadOneMessage() throws IOException { setupOneMessage(); - TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); assertTrue(reader.start()); @@ -160,7 +162,6 @@ public class PubsubUnboundedSourceTest { @Test public void extendAck() throws IOException { setupOneMessage(); - TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Pull the first message but don't take a checkpoint for it. @@ -183,7 +184,6 @@ public class PubsubUnboundedSourceTest { @Test public void timeoutAckExtensions() throws IOException { setupOneMessage(); - TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); // Pull the first message but don't take a checkpoint for it. @@ -220,7 +220,6 @@ public class PubsubUnboundedSourceTest { incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, RECORD_ID)); } setupOneMessage(incoming); - TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); // Consume two messages, only read one. assertTrue(reader.start()); @@ -281,7 +280,6 @@ public class PubsubUnboundedSourceTest { } setupOneMessage(incoming); - TestPipeline p = TestPipeline.create(); PubsubReader<String> reader = primSource.createReader(p.getOptions(), null); PubsubTestClient pubsubClient = (PubsubTestClient) reader.getPubsubClient(); @@ -342,7 +340,6 @@ public class PubsubUnboundedSourceTest { null); assertThat(source.getSubscription(), nullValue()); - TestPipeline.create().apply(source); assertThat(source.getSubscription(), nullValue()); PipelineOptions options = PipelineOptionsFactory.create(); @@ -373,7 +370,6 @@ public class PubsubUnboundedSourceTest { null); assertThat(source.getSubscription(), nullValue()); - TestPipeline.create().apply(source); assertThat(source.getSubscription(), nullValue()); PipelineOptions options = PipelineOptionsFactory.create(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 472399a..b8b28eb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -147,6 +147,9 @@ public class TextIOTest { private static File largeZip; @Rule + public TestPipeline p = TestPipeline.create(); + + @Rule public ExpectedException expectedException = ExpectedException.none(); private static File writeToFile(String[] lines, String filename, CompressionType compression) @@ -224,8 +227,6 @@ public class TextIOTest { } } - Pipeline p = TestPipeline.create(); - TextIO.Read.Bound<T> read; if (coder.equals(StringUtf8Coder.of())) { TextIO.Read.Bound<String> readStrings = TextIO.Read.from(filename); @@ -273,7 +274,7 @@ public class TextIOTest { @Test public void testReadNamed() throws Exception { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); assertEquals( "TextIO.Read/Read.out", @@ -330,8 +331,6 @@ public class TextIOTest { Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); String baseFilename = baseDir.resolve(outputName).toString(); - Pipeline p = TestPipeline.create(); - PCollection<T> input = p.apply(Create.of(Arrays.asList(elems)).withCoder(coder)); TextIO.Write.Bound<T> write; @@ -511,7 +510,6 @@ public class TextIOTest { Coder<String> coder = StringUtf8Coder.of(); String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder, "testwrite"); - Pipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder)); @@ -601,11 +599,10 @@ public class TextIOTest { @Test public void testUnsupportedFilePattern() throws IOException { + p.enableAbandonedNodeEnforcement(false); // Windows doesn't like resolving paths with * in them. String filename = tempFolder.resolve("output@5").toString(); - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(Arrays.asList(LINES_ARRAY)) .withCoder(StringUtf8Coder.of())); @@ -621,13 +618,13 @@ public class TextIOTest { */ @Test public void testBadWildcardRecursive() throws Exception { - Pipeline pipeline = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); // Check that applying does fail. expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("wildcard"); - pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz")); + p.apply(TextIO.Read.from("gs://bucket/foo**/baz")); } /** Options for testing. */ @@ -641,9 +638,11 @@ public class TextIOTest { @Test public void testRuntimeOptionsNotCalledInApply() throws Exception { - Pipeline pipeline = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); + RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class); - pipeline + + p .apply(TextIO.Read.from(options.getInput()).withoutValidation()) .apply(TextIO.Write.to(options.getOutput()).withoutValidation()); } @@ -686,12 +685,12 @@ public class TextIOTest { * Helper method that runs TextIO.Read.from(filename).withCompressionType(compressionType) * and asserts that the results match the given expected output. */ - private static void assertReadingCompressedFileMatchesExpected( + private void assertReadingCompressedFileMatchesExpected( File file, CompressionType compressionType, String[] expected) { - Pipeline p = TestPipeline.create(); + TextIO.Read.Bound<String> read = TextIO.Read.from(file.getPath()).withCompressionType(compressionType); - PCollection<String> output = p.apply(read); + PCollection<String> output = p.apply("Read_" + file + "_" + compressionType.toString(), read); PAssert.that(output).containsInAnyOrder(expected); p.run(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 5a7c994..79f4c4b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -79,6 +79,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class WriteTest { + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); // Static store that can be accessed within the writer @@ -294,7 +295,6 @@ public class WriteTest { @Test public void testWriteUnbounded() { - TestPipeline p = TestPipeline.create(); PCollection<String> unbounded = p.apply(CountingInput.unbounded()) .apply(MapElements.via(new ToStringFn())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java index 1f154d5..d6898d5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.Random; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Source.Reader; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -64,6 +63,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class XmlSourceTest { + + @Rule + public TestPipeline p = TestPipeline.create(); + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -566,8 +569,6 @@ public class XmlSourceTest { @Test @Category(NeedsRunner.class) public void testReadXMLSmallPipeline() throws IOException { - Pipeline p = TestPipeline.create(); - File file = tempFolder.newFile("trainXMLSmall"); Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8)); @@ -661,7 +662,6 @@ public class XmlSourceTest { List<Train> trains = generateRandomTrainList(100); File file = createRandomTrainXML(fileName, trains); - Pipeline p = TestPipeline.create(); XmlSource<Train> source = XmlSource.<Train>from(file.toPath().toString()) .withRootElement("trains") @@ -808,8 +808,6 @@ public class XmlSourceTest { generateRandomTrainList(8); createRandomTrainXML("otherfile.xml", trains1); - Pipeline p = TestPipeline.create(); - XmlSource<Train> source = XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml") .withRootElement("trains") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java index 5e97eed..4e257f1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java @@ -50,7 +50,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -90,6 +89,9 @@ public class ProxyInvocationHandlerTest { void setString(String value); } + @Rule + public TestPipeline p = TestPipeline.create(); + @Test public void testPropertySettingAndGetting() throws Exception { ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, Object>newHashMap()); @@ -785,7 +787,6 @@ public class ProxyInvocationHandlerTest { } }; - Pipeline p = TestPipeline.create(); p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType); p.apply(Create.of(1, 2, 3)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index b0c17d8..2327459 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -51,14 +51,16 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class TransformHierarchyTest { + + @Rule public final TestPipeline pipeline = TestPipeline.create(); @Rule public ExpectedException thrown = ExpectedException.none(); + private TransformHierarchy hierarchy; - private TestPipeline pipeline; + @Before public void setup() { hierarchy = new TransformHierarchy(); - pipeline = TestPipeline.create(); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index d70aa2f..6a6e0fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -55,6 +55,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class TransformTreeTest { + + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); enum TransformsSeen { @@ -112,11 +114,11 @@ public class TransformTreeTest { // visits the nodes and verifies that the hierarchy was captured. @Test public void testCompositeCapture() throws Exception { + p.enableAbandonedNodeEnforcement(false); + File inputFile = tmpFolder.newFile(); File outputFile = tmpFolder.newFile(); - Pipeline p = TestPipeline.create(); - p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath())) .apply(Sample.<String>any(10)) .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath())); @@ -170,18 +172,15 @@ public class TransformTreeTest { @Test(expected = IllegalArgumentException.class) public void testOutputChecking() throws Exception { - Pipeline p = TestPipeline.create(); + p.enableAbandonedNodeEnforcement(false); p.apply(new InvalidCompositeTransform()); - p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {}); } @Test @Category(NeedsRunner.class) public void testMultiGraphSetup() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.begin() .apply(Create.of(1, 2, 3)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java index 417147f..a96e3f8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptor; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -45,10 +46,12 @@ import org.junit.runners.JUnit4; /** Tests for {@link GatherAllPanes}. */ @RunWith(JUnit4.class) public class GatherAllPanesTest implements Serializable { + + @Rule public transient TestPipeline p = TestPipeline.create(); + @Test @Category(NeedsRunner.class) public void singlePaneSingleReifiedPane() { - TestPipeline p = TestPipeline.create(); PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes = p.apply(CountingInput.upTo(20000)) .apply( @@ -91,8 +94,6 @@ public class GatherAllPanesTest implements Serializable { @Test @Category(NeedsRunner.class) public void multiplePanesMultipleReifiedPane() { - TestPipeline p = TestPipeline.create(); - PCollection<Long> someElems = p.apply("someLongs", CountingInput.upTo(20000)); PCollection<Long> otherElems = p.apply("otherLongs", CountingInput.upTo(20000)); PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index be8924f..1997bbe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -59,6 +59,10 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class PAssertTest implements Serializable { + + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -116,8 +120,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testContainsInAnyOrderNotSerializable() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<NotSerializableObject> pcollection = pipeline .apply(Create.of( new NotSerializableObject(), @@ -139,8 +141,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testSerializablePredicate() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<NotSerializableObject> pcollection = pipeline .apply(Create.of( new NotSerializableObject(), @@ -166,8 +166,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedSerializablePredicate() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<NotSerializableObject> pcollection = pipeline .apply(Create.timestamped( TimestampedValue.of(new NotSerializableObject(), new Instant(250L)), @@ -207,7 +205,6 @@ public class PAssertTest implements Serializable { thrown.expect(UnsupportedOperationException.class); thrown.expectMessage("isEqualTo"); - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.thatSingleton(pcollection).equals(42); } @@ -222,7 +219,6 @@ public class PAssertTest implements Serializable { thrown.expect(UnsupportedOperationException.class); thrown.expectMessage("containsInAnyOrder"); - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.that(pcollection).equals(42); } @@ -237,7 +233,6 @@ public class PAssertTest implements Serializable { thrown.expect(UnsupportedOperationException.class); thrown.expectMessage(".hashCode() is not supported."); - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.thatSingleton(pcollection).hashCode(); } @@ -252,7 +247,6 @@ public class PAssertTest implements Serializable { thrown.expect(UnsupportedOperationException.class); thrown.expectMessage(".hashCode() is not supported."); - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.that(pcollection).hashCode(); } @@ -263,7 +257,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testIsEqualTo() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(43)); PAssert.thatSingleton(pcollection).isEqualTo(43); pipeline.run(); @@ -275,7 +268,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedIsEqualTo() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.timestamped(TimestampedValue.of(43, new Instant(250L)), TimestampedValue.of(22, new Instant(-250L)))) @@ -295,7 +287,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testNotEqualTo() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(43)); PAssert.thatSingleton(pcollection).notEqualTo(42); pipeline.run(); @@ -307,7 +298,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testContainsInAnyOrder() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); pipeline.run(); @@ -319,7 +309,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testGlobalWindowContainsInAnyOrder() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 1, 4, 3); pipeline.run(); @@ -331,7 +320,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedContainsInAnyOrder() throws Exception { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> pcollection = pipeline.apply(Create.timestamped(TimestampedValue.of(1, new Instant(100L)), TimestampedValue.of(2, new Instant(200L)), @@ -361,13 +349,12 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmpty() { - Pipeline p = TestPipeline.create(); PCollection<Long> vals = - p.apply(Create.<Long>of().withCoder(VarLongCoder.of())); + pipeline.apply(Create.<Long>of().withCoder(VarLongCoder.of())); PAssert.that(vals).empty(); - p.run(); + pipeline.run(); } /** @@ -376,8 +363,6 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testContainsInAnyOrderFalse() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> pcollection = pipeline .apply(Create.of(1, 2, 3, 4)); @@ -399,11 +384,10 @@ public class PAssertTest implements Serializable { @Test @Category(RunnableOnService.class) public void testEmptyFalse() throws Exception { - Pipeline p = TestPipeline.create(); - PCollection<Long> vals = p.apply(CountingInput.upTo(5L)); + PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); PAssert.that(vals).empty(); - Throwable thrown = runExpectingAssertionFailure(p); + Throwable thrown = runExpectingAssertionFailure(pipeline); assertThat(thrown.getMessage(), containsString("Expected: iterable over [] in any order")); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index a1b4e4a..64aeca3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -65,6 +65,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class TestStreamTest implements Serializable { + @Rule public transient TestPipeline p = TestPipeline.create(); @Rule public transient ExpectedException thrown = ExpectedException.none(); @Test @@ -85,7 +86,6 @@ public class TestStreamTest implements Serializable { TimestampedValue.of(-3, instant)) .advanceWatermarkToInfinity(); - TestPipeline p = TestPipeline.create(); PCollection<Integer> windowed = p .apply(source) .apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering( @@ -146,7 +146,6 @@ public class TestStreamTest implements Serializable { .advanceProcessingTime(Duration.standardMinutes(6)) .advanceWatermarkToInfinity(); - TestPipeline p = TestPipeline.create(); PCollection<Long> sum = p.apply(source) .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() @@ -175,7 +174,6 @@ public class TestStreamTest implements Serializable { TimestampedValue.of("alsoFinalLatePane", new Instant(250))) .advanceWatermarkToInfinity(); - TestPipeline p = TestPipeline.create(); FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); Duration allowedLateness = Duration.millis(5000L); PCollection<String> values = @@ -220,7 +218,6 @@ public class TestStreamTest implements Serializable { .addElements(TimestampedValue.of("onTime", new Instant(100))) .advanceWatermarkToInfinity(); - TestPipeline p = TestPipeline.create(); FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L)); Duration allowedLateness = Duration.millis(5000L); PCollection<String> values = p.apply(stream) @@ -249,7 +246,6 @@ public class TestStreamTest implements Serializable { TimestampedValue.of("bar", endOfGlobalWindow)) .advanceWatermarkToInfinity(); - TestPipeline p = TestPipeline.create(); FixedWindows windows = FixedWindows.of(Duration.standardHours(6)); PCollection<String> windowedValues = p.apply(stream) .apply(Window.<String>into(windows)) @@ -274,7 +270,6 @@ public class TestStreamTest implements Serializable { TestStream<Integer> other = TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 4).advanceWatermarkToInfinity(); - TestPipeline p = TestPipeline.create(); PCollection<String> createStrings = p.apply("CreateStrings", stream) .apply("WindowStrings", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java index ab13946..cd7898b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java @@ -42,6 +42,7 @@ import org.hamcrest.CoreMatchers; import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -63,6 +64,9 @@ public class ApproximateQuantilesTest { KV.of("b", 100) ); + @Rule + public TestPipeline p = TestPipeline.create(); + public PCollection<KV<String, Integer>> createInputTable(Pipeline p) { return p.apply(Create.of(TABLE).withCoder( KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -71,8 +75,6 @@ public class ApproximateQuantilesTest { @Test @Category(NeedsRunner.class) public void testQuantilesGlobally() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> input = intRangeCollection(p, 101); PCollection<List<Integer>> quantiles = input.apply(ApproximateQuantiles.<Integer>globally(5)); @@ -85,8 +87,6 @@ public class ApproximateQuantilesTest { @Test @Category(NeedsRunner.class) public void testQuantilesGobally_comparable() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> input = intRangeCollection(p, 101); PCollection<List<Integer>> quantiles = input.apply( @@ -100,8 +100,6 @@ public class ApproximateQuantilesTest { @Test @Category(NeedsRunner.class) public void testQuantilesPerKey() { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = createInputTable(p); PCollection<KV<String, List<Integer>>> quantiles = input.apply( ApproximateQuantiles.<String, Integer>perKey(2)); @@ -117,8 +115,6 @@ public class ApproximateQuantilesTest { @Test @Category(NeedsRunner.class) public void testQuantilesPerKey_reversed() { - Pipeline p = TestPipeline.create(); - PCollection<KV<String, Integer>> input = createInputTable(p); PCollection<KV<String, List<Integer>>> quantiles = input.apply( ApproximateQuantiles.<String, Integer, DescendingIntComparator>perKey( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java index b63c73d..3afc759 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -52,6 +53,9 @@ import org.junit.runners.JUnit4; public class ApproximateUniqueTest implements Serializable { // implements Serializable just to make it easy to use anonymous inner DoFn subclasses + @Rule + public final transient TestPipeline p = TestPipeline.create(); + @Test public void testEstimationErrorToSampleSize() { assertEquals(40000, ApproximateUnique.sampleSizeFromEstimationError(0.01)); @@ -67,8 +71,6 @@ public class ApproximateUniqueTest implements Serializable { @Test @Category(RunnableOnService.class) public void testApproximateUniqueWithSmallInput() { - Pipeline p = TestPipeline.create(); - PCollection<Integer> input = p.apply( Create.of(Arrays.asList(1, 2, 3, 3))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 8862531..cdd4707 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -28,7 +28,6 @@ import java.io.OutputStream; import java.io.Serializable; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -62,6 +61,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class CombineFnsTest { + @Rule public final TestPipeline p = TestPipeline.create(); @Rule public ExpectedException expectedException = ExpectedException.none(); @Test @@ -123,7 +123,6 @@ public class CombineFnsTest { @Test @Category(RunnableOnService.class) public void testComposedCombine() { - Pipeline p = TestPipeline.create(); p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of()); PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply( @@ -178,7 +177,6 @@ public class CombineFnsTest { @Test @Category(RunnableOnService.class) public void testComposedCombineWithContext() { - Pipeline p = TestPipeline.create(); p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of()); PCollectionView<String> view = p @@ -240,7 +238,6 @@ public class CombineFnsTest { @Test @Category(RunnableOnService.class) public void testComposedCombineNullValues() { - Pipeline p = TestPipeline.create(); p.getCoderRegistry().registerCoder(UserString.class, NullableCoder.of(UserStringCoder.of())); p.getCoderRegistry().registerCoder(String.class, NullableCoder.of(StringUtf8Coder.of())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 671f00e..0ac9502 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.POutput; import org.hamcrest.Matchers; import org.joda.time.Duration; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -108,6 +109,9 @@ public class CombineTest implements Serializable { @Mock private DoFn<?, ?>.ProcessContext processContext; + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); + PCollection<KV<String, Integer>> createInput(Pipeline p, List<KV<String, Integer>> table) { return p.apply(Create.of(table).withCoder( @@ -117,7 +121,6 @@ public class CombineTest implements Serializable { private void runTestSimpleCombine(List<KV<String, Integer>> table, int globalSum, List<KV<String, String>> perKeyCombines) { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = createInput(pipeline, table); PCollection<Integer> sum = input @@ -138,7 +141,6 @@ public class CombineTest implements Serializable { int globalSum, List<KV<String, String>> perKeyCombines, String[] globallyCombines) { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> perKeyInput = createInput(pipeline, table); PCollection<Integer> globallyInput = perKeyInput.apply(Values.<Integer>create()); @@ -197,7 +199,6 @@ public class CombineTest implements Serializable { private void runTestBasicCombine(List<KV<String, Integer>> table, Set<Integer> globalUnique, List<KV<String, Set<Integer>>> perKeyUnique) { - Pipeline pipeline = TestPipeline.create(); pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class); PCollection<KV<String, Integer>> input = createInput(pipeline, table); @@ -233,7 +234,6 @@ public class CombineTest implements Serializable { private void runTestAccumulatingCombine(List<KV<String, Integer>> table, Double globalMean, List<KV<String, Double>> perKeyMeans) { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = createInput(pipeline, table); PCollection<Double> mean = input @@ -253,8 +253,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFixedWindowsCombine() { - Pipeline pipeline = TestPipeline.create(); - PCollection<KV<String, Integer>> input = pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) @@ -279,8 +277,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testFixedWindowsCombineWithContext() { - Pipeline pipeline = TestPipeline.create(); - PCollection<KV<String, Integer>> perKeyInput = pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 8L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) @@ -316,8 +312,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testSlidingWindowsCombineWithContext() { - Pipeline pipeline = TestPipeline.create(); - PCollection<KV<String, Integer>> perKeyInput = pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 10L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) @@ -365,7 +359,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testGlobalCombineWithDefaultsAndTriggers() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> input = pipeline.apply(Create.of(1, 1)); PCollection<String> output = input @@ -392,8 +385,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testSessionsCombine() { - Pipeline pipeline = TestPipeline.create(); - PCollection<KV<String, Integer>> input = pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) @@ -417,8 +408,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testSessionsCombineWithContext() { - Pipeline pipeline = TestPipeline.create(); - PCollection<KV<String, Integer>> perKeyInput = pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 10L, 16L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); @@ -459,8 +448,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testWindowedCombineEmpty() { - Pipeline pipeline = TestPipeline.create(); - PCollection<Double> mean = pipeline .apply(Create.<Integer>of().withCoder(BigEndianIntegerCoder.of())) .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(1)))) @@ -517,7 +504,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testHotKeyCombining() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10); KeyedCombineFn<String, Integer, ?, Double> mean = @@ -552,7 +538,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testHotKeyCombiningWithAccumulationMode() { - Pipeline pipeline = TestPipeline.create(); PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5)); PCollection<Integer> output = input @@ -577,7 +562,6 @@ public class CombineTest implements Serializable { @Test @Category(NeedsRunner.class) public void testBinaryCombineFn() { - Pipeline pipeline = TestPipeline.create(); PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 2); PCollection<KV<String, Integer>> intProduct = input .apply("IntProduct", Combine.<String, Integer, Integer>perKey(new TestProdInt())); @@ -632,7 +616,6 @@ public class CombineTest implements Serializable { @Test @Category(RunnableOnService.class) public void testCombineGloballyAsSingletonView() { - Pipeline pipeline = TestPipeline.create(); final PCollectionView<Integer> view = pipeline .apply("CreateEmptySideInput", Create.<Integer>of().withCoder(BigEndianIntegerCoder.of())) .apply(Sum.integersGlobally().asSingletonView()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java index 7f77ae7..eafb12d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java @@ -22,13 +22,13 @@ import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.List; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -45,12 +45,13 @@ public class CountTest { static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + @Rule + public TestPipeline p = TestPipeline.create(); + @Test @Category(RunnableOnService.class) @SuppressWarnings("unchecked") public void testCountPerElementBasic() { - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(WORDS)); PCollection<KV<String, Long>> output = @@ -71,8 +72,6 @@ public class CountTest { @Category(RunnableOnService.class) @SuppressWarnings("unchecked") public void testCountPerElementEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of())); PCollection<KV<String, Long>> output = @@ -85,8 +84,6 @@ public class CountTest { @Test @Category(RunnableOnService.class) public void testCountGloballyBasic() { - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(WORDS)); PCollection<Long> output = @@ -100,8 +97,6 @@ public class CountTest { @Test @Category(RunnableOnService.class) public void testCountGloballyEmpty() { - Pipeline p = TestPipeline.create(); - PCollection<String> input = p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of())); PCollection<Long> output =