Migrated the beam-sdks-java-core module to TestPipeline as a JUnit rule.
Plus, fixed some checkstyle errors from previous modules' migration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75a4c918
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75a4c918
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75a4c918

Branch: refs/heads/python-sdk
Commit: 75a4c918346b5a04213a54bf7d1bf6507655342a
Parents: 09c404a
Author: Stas Levin <stasle...@gmail.com>
Authored: Mon Dec 19 23:54:47 2016 +0200
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Dec 20 09:55:45 2016 -0800

----------------------------------------------------------------------
 .../UnboundedReadFromBoundedSourceTest.java     |   1 -
 .../direct/CloningBundleFactoryTest.java        |   2 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   6 +-
 .../ImmutabilityCheckingBundleFactoryTest.java  |   2 +-
 .../direct/ImmutableListBundleFactoryTest.java  |   2 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../java/org/apache/beam/sdk/PipelineTest.java  |  37 +++---
 .../apache/beam/sdk/coders/AvroCoderTest.java   |  11 +-
 .../beam/sdk/coders/CoderRegistryTest.java      |   6 +-
 .../beam/sdk/coders/SerializableCoderTest.java  |   7 +-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  16 +--
 .../io/BoundedReadFromUnboundedSourceTest.java  |   6 +-
 .../beam/sdk/io/CompressedSourceTest.java       |  12 +-
 .../apache/beam/sdk/io/CountingInputTest.java   |  12 +-
 .../apache/beam/sdk/io/CountingSourceTest.java  |  13 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   4 +-
 .../beam/sdk/io/PubsubUnboundedSinkTest.java    |  10 +-
 .../beam/sdk/io/PubsubUnboundedSourceTest.java  |  12 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  29 +++--
 .../java/org/apache/beam/sdk/io/WriteTest.java  |   2 +-
 .../org/apache/beam/sdk/io/XmlSourceTest.java   |  10 +-
 .../sdk/options/ProxyInvocationHandlerTest.java |   5 +-
 .../sdk/runners/TransformHierarchyTest.java     |   6 +-
 .../beam/sdk/runners/TransformTreeTest.java     |  11 +-
 .../beam/sdk/testing/GatherAllPanesTest.java    |   7 +-
 .../apache/beam/sdk/testing/PAssertTest.java    |  32 ++---
 .../apache/beam/sdk/testing/TestStreamTest.java |   7 +-
 .../transforms/ApproximateQuantilesTest.java    |  12 +-
 .../sdk/transforms/ApproximateUniqueTest.java   |   6 +-
 .../beam/sdk/transforms/CombineFnsTest.java     |   5 +-
 .../apache/beam/sdk/transforms/CombineTest.java |  25 +---
 .../apache/beam/sdk/transforms/CountTest.java   |  13 +-
 .../apache/beam/sdk/transforms/CreateTest.java  |  27 +----
 .../beam/sdk/transforms/DistinctTest.java       |  12 +-
 .../apache/beam/sdk/transforms/DoFnTest.java    |   4 +-
 .../beam/sdk/transforms/DoFnTesterTest.java     |   6 +-
 .../apache/beam/sdk/transforms/FilterTest.java  |  18 +--
 .../sdk/transforms/FlatMapElementsTest.java     |  10 +-
 .../apache/beam/sdk/transforms/FlattenTest.java |  35 +-----
 .../beam/sdk/transforms/GroupByKeyTest.java     |  30 ++---
 .../apache/beam/sdk/transforms/KeysTest.java    |   9 +-
 .../apache/beam/sdk/transforms/KvSwapTest.java  |   9 +-
 .../apache/beam/sdk/transforms/LatestTest.java  |  12 +-
 .../beam/sdk/transforms/MapElementsTest.java    |  14 +--
 .../beam/sdk/transforms/ParDoLifecycleTest.java |  17 +--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 118 +++++++------------
 .../beam/sdk/transforms/PartitionTest.java      |   8 +-
 .../apache/beam/sdk/transforms/RegexTest.java   |  25 +---
 .../apache/beam/sdk/transforms/SampleTest.java  |  34 +++---
 .../beam/sdk/transforms/SplittableDoFnTest.java |  12 +-
 .../org/apache/beam/sdk/transforms/TopTest.java |  15 ++-
 .../apache/beam/sdk/transforms/ValuesTest.java  |   7 +-
 .../apache/beam/sdk/transforms/ViewTest.java    |  84 ++++---------
 .../beam/sdk/transforms/WithKeysTest.java       |   8 +-
 .../beam/sdk/transforms/WithTimestampsTest.java |   9 +-
 .../sdk/transforms/join/CoGroupByKeyTest.java   |  11 +-
 .../sdk/transforms/windowing/WindowTest.java    |  22 ++--
 .../sdk/transforms/windowing/WindowingTest.java |  11 +-
 .../org/apache/beam/sdk/util/ReshuffleTest.java |  11 +-
 .../beam/sdk/values/PCollectionTupleTest.java   |  12 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |   9 +-
 .../apache/beam/sdk/values/TypedPValueTest.java |  10 +-
 62 files changed, 353 insertions(+), 587 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index 86450f2..0f09cd1 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -36,7 +36,6 @@ import java.util.Random;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.CheckpointCoder;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
index e5299a2..505d3a2 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CloningBundleFactoryTest.java
@@ -62,7 +62,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class CloningBundleFactoryTest {
   @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public final TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   private CloningBundleFactory factory = CloningBundleFactory.create();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 35245f4..12ef66c 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -62,14 +62,10 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class CopyOnAccessInMemoryStateInternalsTest {
 
-  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
   private String key = "foo";
 
-  public CopyOnAccessInMemoryStateInternalsTest() {
-    pipeline = TestPipeline.create();
-  }
-
   @Test
   public void testGetWithEmpty() {
     CopyOnAccessInMemoryStateInternals<String> internals =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 2448078..eccb3a6 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -47,7 +47,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class ImmutabilityCheckingBundleFactoryTest {
 
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public final TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
   @Rule public ExpectedException thrown = ExpectedException.none();
   private ImmutabilityCheckingBundleFactory factory;
   private PCollection<byte[]> created;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
index 46f02cd..3327ccd 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutableListBundleFactoryTest.java
@@ -57,7 +57,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ImmutableListBundleFactoryTest {
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public final TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   private ImmutableListBundleFactory bundleFactory = 
ImmutableListBundleFactory.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index a8c4c02..7432e61 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -69,7 +69,7 @@ public class WriteWithShardingFactoryTest {
   public static final int INPUT_SIZE = 10000;
   @Rule public TemporaryFolder tmp = new TemporaryFolder();
   private WriteWithShardingFactory<Object> factory = new 
WriteWithShardingFactory<>();
-  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public final TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Test
   public void dynamicallyReshardedWrite() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index fea1554..d8e4ef4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -62,6 +62,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class PipelineTest {
 
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
   @Rule public ExpectedLogs logged = ExpectedLogs.none(Pipeline.class);
   @Rule public ExpectedException thrown = ExpectedException.none();
 
@@ -128,8 +129,7 @@ public class PipelineTest {
     PTransform<PCollection<? extends String>, PCollection<String>> myTransform 
=
         addSuffix("+");
 
-    Pipeline p = TestPipeline.create();
-    PCollection<String> input = 
p.apply(Create.<String>of(ImmutableList.of("a", "b")));
+    PCollection<String> input = 
pipeline.apply(Create.<String>of(ImmutableList.of("a", "b")));
 
     PCollection<String> left = input.apply("Left1", 
myTransform).apply("Left2", myTransform);
     PCollection<String> right = input.apply("Right", myTransform);
@@ -139,7 +139,7 @@ public class PipelineTest {
 
     PAssert.that(both).containsInAnyOrder("a++", "b++", "a+", "b+");
 
-    p.run();
+    pipeline.run();
   }
 
   private static PTransform<PCollection<? extends String>, 
PCollection<String>> addSuffix(
@@ -162,35 +162,36 @@ public class PipelineTest {
 
   @Test
   public void testStableUniqueNameOff() {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setStableUniqueNames(CheckEnabled.OFF);
+    pipeline.enableAbandonedNodeEnforcement(false);
+
+    pipeline.getOptions().setStableUniqueNames(CheckEnabled.OFF);
 
-    p.apply(Create.of(5, 6, 7));
-    p.apply(Create.of(5, 6, 7));
+    pipeline.apply(Create.of(5, 6, 7));
+    pipeline.apply(Create.of(5, 6, 7));
 
     logged.verifyNotLogged("does not have a stable unique name.");
   }
 
   @Test
   public void testStableUniqueNameWarning() {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setStableUniqueNames(CheckEnabled.WARNING);
+    pipeline.enableAbandonedNodeEnforcement(false);
 
-    p.apply(Create.of(5, 6, 7));
-    p.apply(Create.of(5, 6, 7));
+    pipeline.getOptions().setStableUniqueNames(CheckEnabled.WARNING);
+
+    pipeline.apply(Create.of(5, 6, 7));
+    pipeline.apply(Create.of(5, 6, 7));
 
     logged.verifyWarn("does not have a stable unique name.");
   }
 
   @Test
   public void testStableUniqueNameError() {
-    Pipeline p = TestPipeline.create();
-    p.getOptions().setStableUniqueNames(CheckEnabled.ERROR);
+    pipeline.getOptions().setStableUniqueNames(CheckEnabled.ERROR);
 
-    p.apply(Create.of(5, 6, 7));
+    pipeline.apply(Create.of(5, 6, 7));
 
     thrown.expectMessage("does not have a stable unique name.");
-    p.apply(Create.of(5, 6, 7));
+    pipeline.apply(Create.of(5, 6, 7));
   }
 
   /**
@@ -199,7 +200,6 @@ public class PipelineTest {
   @Test
   @Category(RunnableOnService.class)
   public void testIdentityTransform() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     PCollection<Integer> output = pipeline
         .apply(Create.<Integer>of(1, 2, 3, 4))
@@ -223,8 +223,6 @@ public class PipelineTest {
   @Test
   @Category(RunnableOnService.class)
   public void testTupleProjectionTransform() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<Integer> input = pipeline
         .apply(Create.<Integer>of(1, 2, 3, 4));
 
@@ -258,8 +256,6 @@ public class PipelineTest {
   @Test
   @Category(RunnableOnService.class)
   public void testTupleInjectionTransform() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<Integer> input = pipeline
         .apply(Create.<Integer>of(1, 2, 3, 4));
 
@@ -292,7 +288,6 @@ public class PipelineTest {
   @Test
   @Category(NeedsRunner.class)
   public void testEmptyPipeline() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     pipeline.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index adfa0d2..60dc07a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -55,7 +55,6 @@ import org.apache.avro.reflect.Stringable;
 import org.apache.avro.reflect.Union;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
 import org.apache.beam.sdk.testing.CoderProperties;
@@ -73,6 +72,7 @@ import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeMatcher;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -143,6 +143,9 @@ public class AvroCoderTest {
     }
   }
 
+  @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
   @Test
   public void testAvroCoderEncoding() throws Exception {
     AvroCoder<Pojo> coder = AvroCoder.of(Pojo.class);
@@ -287,17 +290,15 @@ public class AvroCoderTest {
   @Test
   @Category(NeedsRunner.class)
   public void testDefaultCoder() throws Exception {
-    Pipeline p = TestPipeline.create();
-
     // Use MyRecord as input and output types without explicitly specifying
     // a coder (this uses the default coders, which may not be AvroCoder).
     PCollection<String> output =
-        p.apply(Create.of(new Pojo("hello", 1), new Pojo("world", 2)))
+        pipeline.apply(Create.of(new Pojo("hello", 1), new Pojo("world", 2)))
             .apply(ParDo.of(new GetTextFn()));
 
     PAssert.that(output)
         .containsInAnyOrder("hello", "world");
-    p.run();
+    pipeline.run();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index d7badab..8c0e584 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -35,7 +35,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CoderRegistry.IncompatibleCoderException;
 import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -63,6 +62,9 @@ import org.junit.runners.JUnit4;
 public class CoderRegistryTest {
 
   @Rule
+  public TestPipeline pipeline = TestPipeline.create();
+
+  @Rule
   public ExpectedException thrown = ExpectedException.none();
 
   public static CoderRegistry getStandardRegistry() {
@@ -414,7 +416,6 @@ public class CoderRegistryTest {
   @Test
   @Category(NeedsRunner.class)
   public void testSpecializedButIgnoredGenericInPipeline() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     pipeline
         .apply(Create.of("hello", "goodbye"))
@@ -443,7 +444,6 @@ public class CoderRegistryTest {
   @Test
   @Category(NeedsRunner.class)
   public void testIgnoredGenericInPipeline() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
 
     pipeline
         .apply(Create.of("hello", "goodbye"))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index 8d344de..296ddc9 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -26,7 +26,6 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -40,6 +39,7 @@ import org.apache.beam.sdk.util.Serializer;
 import org.apache.beam.sdk.values.PCollection;
 import org.hamcrest.Matchers;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -98,6 +98,9 @@ public class SerializableCoderTest implements Serializable {
       "To be,",
       "or not to be");
 
+  @Rule
+  public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
   @Test
   public void testSerializableCoder() throws Exception {
     IterableCoder<MyRecord> coder = IterableCoder
@@ -136,7 +139,7 @@ public class SerializableCoderTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testDefaultCoder() throws Exception {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(true);
 
     // Use MyRecord as input and output types without explicitly specifying
     // a coder (this uses the default coders, which may not be

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 41a630f..b669968 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -74,6 +74,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class AvroIOTest {
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -135,7 +139,6 @@ public class AvroIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadASingleFile() throws Throwable {
-    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -146,7 +149,6 @@ public class AvroIOTest {
           .withSchema(GenericClass.class));
     p.run();
 
-    p = TestPipeline.create();
     PCollection<GenericClass> input = p
         
.apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClass.class));
 
@@ -158,7 +160,6 @@ public class AvroIOTest {
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
   public void testAvroIOCompressedWriteAndReadASingleFile() throws Throwable {
-    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -170,7 +171,6 @@ public class AvroIOTest {
             .withSchema(GenericClass.class));
     p.run();
 
-    p = TestPipeline.create();
     PCollection<GenericClass> input = p
         .apply(AvroIO.Read
             .from(outputFile.getAbsolutePath())
@@ -187,7 +187,6 @@ public class AvroIOTest {
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
   public void testAvroIONullCodecWriteAndReadASingleFile() throws Throwable {
-    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -199,7 +198,6 @@ public class AvroIOTest {
             .withCodec(CodecFactory.nullCodec()));
     p.run();
 
-    p = TestPipeline.create();
     PCollection<GenericClass> input = p
         .apply(AvroIO.Read
             .from(outputFile.getAbsolutePath())
@@ -257,7 +255,6 @@ public class AvroIOTest {
   @Test
   @Category(NeedsRunner.class)
   public void testAvroIOWriteAndReadSchemaUpgrade() throws Throwable {
-    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -270,7 +267,7 @@ public class AvroIOTest {
 
     List<GenericClassV2> expected = ImmutableList.of(new GenericClassV2(3, 
"hi", null),
         new GenericClassV2(5, "bar", null));
-    p = TestPipeline.create();
+
     PCollection<GenericClassV2> input = p
         
.apply(AvroIO.Read.from(outputFile.getAbsolutePath()).withSchema(GenericClassV2.class));
 
@@ -321,7 +318,6 @@ public class AvroIOTest {
   @SuppressWarnings("unchecked")
   @Category(NeedsRunner.class)
   public void testMetdata() throws Exception {
-    TestPipeline p = TestPipeline.create();
     List<GenericClass> values = ImmutableList.of(new GenericClass(3, "hi"),
         new GenericClass(5, "bar"));
     File outputFile = tmpFolder.newFile("output.avro");
@@ -348,7 +344,7 @@ public class AvroIOTest {
   private void runTestWrite(String[] expectedElements, int numShards) throws 
IOException {
     File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
     String outputFilePrefix = baseOutputFile.getAbsolutePath();
-    TestPipeline p = TestPipeline.create();
+
     Bound<String> write = 
AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
     if (numShards > 1) {
       write = write.withNumShards(numShards);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
index 4d7814c..d49873e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
@@ -26,7 +26,6 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
@@ -36,6 +35,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -46,6 +46,9 @@ import org.junit.runners.JUnit4;
 public class BoundedReadFromUnboundedSourceTest implements Serializable{
   private static final int NUM_RECORDS = 100;
 
+  @Rule
+  public transient TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testNoDedup() throws Exception {
@@ -112,7 +115,6 @@ public class BoundedReadFromUnboundedSourceTest implements 
Serializable{
   }
 
   private void test(boolean dedup, boolean timeBound) throws Exception {
-    Pipeline p = TestPipeline.create();
 
     TestCountingSource source = new 
TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
     if (dedup) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index f8769ea..3871159 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -47,7 +47,6 @@ import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.zip.GZIPOutputStream;
 import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
@@ -80,6 +79,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class CompressedSourceTest {
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
@@ -199,8 +202,6 @@ public class CompressedSourceTest {
       os.write(totalGz);
     }
 
-    Pipeline p = TestPipeline.create();
-
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1))
             .withDecompression(CompressionMode.GZIP);
@@ -274,8 +275,6 @@ public class CompressedSourceTest {
 
     String filePattern = new File(tmpFolder.getRoot().toString(), baseName + 
".*").toString();
 
-    Pipeline p = TestPipeline.create();
-
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(filePattern, 1));
     PCollection<Byte> output = p.apply(Read.from(source));
@@ -395,8 +394,6 @@ public class CompressedSourceTest {
       expected.addAll(Bytes.asList(generated));
     }
 
-    Pipeline p = TestPipeline.create();
-
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(filePattern, 1))
             .withDecompression(CompressionMode.GZIP);
@@ -476,7 +473,6 @@ public class CompressedSourceTest {
 
   private void verifyReadContents(byte[] expected, File inputFile,
       @Nullable DecompressingChannelFactory decompressionFactory) {
-    Pipeline p = TestPipeline.create();
     CompressedSource<Byte> source =
         CompressedSource.from(new ByteSource(inputFile.toPath().toString(), 
1));
     if (decompressionFactory != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index dfc4919..f23ee76 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
@@ -39,6 +38,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -66,10 +66,12 @@ public class CountingInputTest {
         .isEqualTo(end - 1);
   }
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testBoundedInput() {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
     PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
 
@@ -80,7 +82,6 @@ public class CountingInputTest {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyBoundedInput() {
-    Pipeline p = TestPipeline.create();
     PCollection<Long> input = p.apply(CountingInput.upTo(0));
 
     PAssert.that(input).empty();
@@ -90,7 +91,6 @@ public class CountingInputTest {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyBoundedInputSubrange() {
-    Pipeline p = TestPipeline.create();
     PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42));
 
     PAssert.that(input).empty();
@@ -101,7 +101,6 @@ public class CountingInputTest {
   @Test
   @Category(RunnableOnService.class)
   public void testBoundedInputSubrange() {
-    Pipeline p = TestPipeline.create();
     long start = 10;
     long end = 1000;
     PCollection<Long> input = p.apply(CountingInput.forSubrange(start, end));
@@ -128,7 +127,6 @@ public class CountingInputTest {
   @Test
   @Category(RunnableOnService.class)
   public void testUnboundedInput() {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
 
     PCollection<Long> input = 
p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
@@ -140,7 +138,6 @@ public class CountingInputTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedInputRate() {
-    Pipeline p = TestPipeline.create();
     long numElements = 5000;
 
     long elemsPerPeriod = 10L;
@@ -169,7 +166,6 @@ public class CountingInputTest {
   @Test
   @Category(RunnableOnService.class)
   public void testUnboundedInputTimestamps() {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
 
     PCollection<Long> input =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 5eccde6..dfd0949 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.CountingSource.CounterMark;
 import org.apache.beam.sdk.io.CountingSource.UnboundedCountingSource;
@@ -48,6 +47,7 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -79,10 +79,12 @@ public class CountingSourceTest {
       .isEqualTo(numElements - 1);
   }
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   public void testBoundedSource() {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
     PCollection<Long> input = 
p.apply(Read.from(CountingSource.upTo(numElements)));
 
@@ -93,7 +95,6 @@ public class CountingSourceTest {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyBoundedSource() {
-    Pipeline p = TestPipeline.create();
     PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(0)));
 
     PAssert.that(input).empty();
@@ -103,7 +104,6 @@ public class CountingSourceTest {
   @Test
   @Category(RunnableOnService.class)
   public void testBoundedSourceSplits() throws Exception {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
     long numSplits = 10;
     long splitSizeBytes = numElements * 8 / numSplits;  // 8 bytes per long 
element.
@@ -157,7 +157,6 @@ public class CountingSourceTest {
   @Test
   @Category(RunnableOnService.class)
   public void testUnboundedSource() {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
 
     PCollection<Long> input = p
@@ -177,7 +176,6 @@ public class CountingSourceTest {
   @Test
   @Category(RunnableOnService.class)
   public void testUnboundedSourceTimestamps() {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
 
     PCollection<Long> input = p.apply(
@@ -197,7 +195,6 @@ public class CountingSourceTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceWithRate() {
-    Pipeline p = TestPipeline.create();
 
     Duration period = Duration.millis(5);
     long numElements = 1000L;
@@ -232,7 +229,6 @@ public class CountingSourceTest {
   @Test
   @Category(RunnableOnService.class)
   public void testUnboundedSourceSplits() throws Exception {
-    Pipeline p = TestPipeline.create();
     long numElements = 1000;
     int numSplits = 10;
 
@@ -257,7 +253,6 @@ public class CountingSourceTest {
   @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceRateSplits() throws Exception {
-    Pipeline p = TestPipeline.create();
     int elementsPerPeriod = 10;
     Duration period = Duration.millis(5);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index f4b8574..f709e22 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -42,7 +42,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Random;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
@@ -73,6 +72,7 @@ public class FileBasedSourceTest {
 
   Random random = new Random(0L);
 
+  @Rule public final TestPipeline p = TestPipeline.create();
   @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
   @Rule public ExpectedException thrown = ExpectedException.none();
 
@@ -719,7 +719,6 @@ public class FileBasedSourceTest {
   @Test
   @Category(NeedsRunner.class)
   public void testDataflowFile() throws IOException {
-    Pipeline p = TestPipeline.create();
     List<String> data = createStringDataset(3, 50);
 
     String fileName = "file";
@@ -735,7 +734,6 @@ public class FileBasedSourceTest {
   @Test
   @Category(NeedsRunner.class)
   public void testDataflowFilePattern() throws IOException {
-    Pipeline p = TestPipeline.create();
 
     List<String> data1 = createStringDataset(3, 50);
     File file1 = createFileWithData("file1", data1);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index 518136f..5bc1664 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.util.PubsubTestClient;
 import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -67,6 +68,9 @@ public class PubsubUnboundedSinkTest {
     return Hashing.murmur3_128().hashBytes(data.getBytes()).toString();
   }
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Test
   public void saneCoder() throws Exception {
     OutgoingMessage message = new OutgoingMessage(DATA.getBytes(), TIMESTAMP, 
getRecordId(DATA));
@@ -88,7 +92,7 @@ public class PubsubUnboundedSinkTest {
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), 
StringUtf8Coder.of(),
               TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
               Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
-      TestPipeline p = TestPipeline.create();
+
       p.apply(Create.of(ImmutableList.of(DATA)))
        .apply(ParDo.of(new Stamp()))
        .apply(sink);
@@ -117,7 +121,7 @@ public class PubsubUnboundedSinkTest {
           new PubsubUnboundedSink<>(factory, StaticValueProvider.of(TOPIC), 
StringUtf8Coder.of(),
               TIMESTAMP_LABEL, ID_LABEL, NUM_SHARDS, batchSize, batchBytes,
               Duration.standardSeconds(2), RecordIdMethod.DETERMINISTIC);
-      TestPipeline p = TestPipeline.create();
+
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
        .apply(sink);
@@ -153,7 +157,7 @@ public class PubsubUnboundedSinkTest {
               StringUtf8Coder.of(), TIMESTAMP_LABEL, ID_LABEL,
               NUM_SHARDS, batchSize, batchBytes, Duration.standardSeconds(2),
               RecordIdMethod.DETERMINISTIC);
-      TestPipeline p = TestPipeline.create();
+
       p.apply(Create.of(data))
        .apply(ParDo.of(new Stamp()))
        .apply(sink);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
index f6165c5..601e2c8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSourceTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.util.PubsubTestClient;
 import org.apache.beam.sdk.util.PubsubTestClient.PubsubTestClientFactory;
 import org.joda.time.Instant;
 import org.junit.After;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -82,6 +83,9 @@ public class PubsubUnboundedSourceTest {
   private PubsubTestClientFactory factory;
   private PubsubSource<String> primSource;
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   private void setupOneMessage(Iterable<IncomingMessage> incoming) {
     now = new AtomicLong(REQ_TIME);
     clock = new Clock() {
@@ -124,7 +128,6 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void readOneMessage() throws IOException {
     setupOneMessage();
-    TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
     // Read one message.
     assertTrue(reader.start());
@@ -139,7 +142,6 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void timeoutAckAndRereadOneMessage() throws IOException {
     setupOneMessage();
-    TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
     PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
     assertTrue(reader.start());
@@ -160,7 +162,6 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void extendAck() throws IOException {
     setupOneMessage();
-    TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
     PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
     // Pull the first message but don't take a checkpoint for it.
@@ -183,7 +184,6 @@ public class PubsubUnboundedSourceTest {
   @Test
   public void timeoutAckExtensions() throws IOException {
     setupOneMessage();
-    TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
     PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
     // Pull the first message but don't take a checkpoint for it.
@@ -220,7 +220,6 @@ public class PubsubUnboundedSourceTest {
       incoming.add(new IncomingMessage(data.getBytes(), TIMESTAMP, 0, ackid, 
RECORD_ID));
     }
     setupOneMessage(incoming);
-    TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
     // Consume two messages, only read one.
     assertTrue(reader.start());
@@ -281,7 +280,6 @@ public class PubsubUnboundedSourceTest {
     }
     setupOneMessage(incoming);
 
-    TestPipeline p = TestPipeline.create();
     PubsubReader<String> reader = primSource.createReader(p.getOptions(), 
null);
     PubsubTestClient pubsubClient = (PubsubTestClient) 
reader.getPubsubClient();
 
@@ -342,7 +340,6 @@ public class PubsubUnboundedSourceTest {
             null);
     assertThat(source.getSubscription(), nullValue());
 
-    TestPipeline.create().apply(source);
     assertThat(source.getSubscription(), nullValue());
 
     PipelineOptions options = PipelineOptionsFactory.create();
@@ -373,7 +370,6 @@ public class PubsubUnboundedSourceTest {
             null);
     assertThat(source.getSubscription(), nullValue());
 
-    TestPipeline.create().apply(source);
     assertThat(source.getSubscription(), nullValue());
 
     PipelineOptions options = PipelineOptionsFactory.create();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 472399a..b8b28eb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -147,6 +147,9 @@ public class TextIOTest {
   private static File largeZip;
 
   @Rule
+  public TestPipeline p = TestPipeline.create();
+
+  @Rule
   public ExpectedException expectedException = ExpectedException.none();
 
   private static File writeToFile(String[] lines, String filename, 
CompressionType compression)
@@ -224,8 +227,6 @@ public class TextIOTest {
       }
     }
 
-    Pipeline p = TestPipeline.create();
-
     TextIO.Read.Bound<T> read;
     if (coder.equals(StringUtf8Coder.of())) {
       TextIO.Read.Bound<String> readStrings = TextIO.Read.from(filename);
@@ -273,7 +274,7 @@ public class TextIOTest {
 
   @Test
   public void testReadNamed() throws Exception {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
 
     assertEquals(
         "TextIO.Read/Read.out",
@@ -330,8 +331,6 @@ public class TextIOTest {
     Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
     String baseFilename = baseDir.resolve(outputName).toString();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<T> input = 
p.apply(Create.of(Arrays.asList(elems)).withCoder(coder));
 
     TextIO.Write.Bound<T> write;
@@ -511,7 +510,6 @@ public class TextIOTest {
     Coder<String> coder = StringUtf8Coder.of();
     String outputName = "file.txt";
     Path baseDir = Files.createTempDirectory(tempFolder, "testwrite");
-    Pipeline p = TestPipeline.create();
 
     PCollection<String> input = 
p.apply(Create.of(Arrays.asList(LINES2_ARRAY)).withCoder(coder));
 
@@ -601,11 +599,10 @@ public class TextIOTest {
 
   @Test
   public void testUnsupportedFilePattern() throws IOException {
+    p.enableAbandonedNodeEnforcement(false);
     // Windows doesn't like resolving paths with * in them.
     String filename = tempFolder.resolve("output@5").toString();
 
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(LINES_ARRAY))
             .withCoder(StringUtf8Coder.of()));
@@ -621,13 +618,13 @@ public class TextIOTest {
    */
   @Test
   public void testBadWildcardRecursive() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
 
     // Check that applying does fail.
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("wildcard");
 
-    pipeline.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
+    p.apply(TextIO.Read.from("gs://bucket/foo**/baz"));
   }
 
   /** Options for testing. */
@@ -641,9 +638,11 @@ public class TextIOTest {
 
   @Test
   public void testRuntimeOptionsNotCalledInApply() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
+
     RuntimeTestOptions options = 
PipelineOptionsFactory.as(RuntimeTestOptions.class);
-    pipeline
+
+    p
         .apply(TextIO.Read.from(options.getInput()).withoutValidation())
         .apply(TextIO.Write.to(options.getOutput()).withoutValidation());
   }
@@ -686,12 +685,12 @@ public class TextIOTest {
    * Helper method that runs 
TextIO.Read.from(filename).withCompressionType(compressionType)
    * and asserts that the results match the given expected output.
    */
-  private static void assertReadingCompressedFileMatchesExpected(
+  private void assertReadingCompressedFileMatchesExpected(
       File file, CompressionType compressionType, String[] expected) {
-    Pipeline p = TestPipeline.create();
+
     TextIO.Read.Bound<String> read =
         TextIO.Read.from(file.getPath()).withCompressionType(compressionType);
-    PCollection<String> output = p.apply(read);
+    PCollection<String> output = p.apply("Read_" + file + "_" + 
compressionType.toString(), read);
 
     PAssert.that(output).containsInAnyOrder(expected);
     p.run();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 5a7c994..79f4c4b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -79,6 +79,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class WriteTest {
+  @Rule public final TestPipeline p = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   // Static store that can be accessed within the writer
@@ -294,7 +295,6 @@ public class WriteTest {
 
   @Test
   public void testWriteUnbounded() {
-    TestPipeline p = TestPipeline.create();
     PCollection<String> unbounded = p.apply(CountingInput.unbounded())
         .apply(MapElements.via(new ToStringFn()));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index 1f154d5..d6898d5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -40,7 +40,6 @@ import java.util.List;
 import java.util.Random;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Source.Reader;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -64,6 +63,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class XmlSourceTest {
+
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Rule
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -566,8 +569,6 @@ public class XmlSourceTest {
   @Test
   @Category(NeedsRunner.class)
   public void testReadXMLSmallPipeline() throws IOException {
-    Pipeline p = TestPipeline.create();
-
     File file = tempFolder.newFile("trainXMLSmall");
     Files.write(file.toPath(), trainXML.getBytes(StandardCharsets.UTF_8));
 
@@ -661,7 +662,6 @@ public class XmlSourceTest {
     List<Train> trains = generateRandomTrainList(100);
     File file = createRandomTrainXML(fileName, trains);
 
-    Pipeline p = TestPipeline.create();
     XmlSource<Train> source =
         XmlSource.<Train>from(file.toPath().toString())
             .withRootElement("trains")
@@ -808,8 +808,6 @@ public class XmlSourceTest {
     generateRandomTrainList(8);
     createRandomTrainXML("otherfile.xml", trains1);
 
-    Pipeline p = TestPipeline.create();
-
     XmlSource<Train> source =
         XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml")
             .withRootElement("trains")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 5e97eed..4e257f1 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -50,7 +50,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -90,6 +89,9 @@ public class ProxyInvocationHandlerTest {
     void setString(String value);
   }
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Test
   public void testPropertySettingAndGetting() throws Exception {
     ProxyInvocationHandler handler = new ProxyInvocationHandler(Maps.<String, 
Object>newHashMap());
@@ -785,7 +787,6 @@ public class ProxyInvocationHandlerTest {
       }
     };
 
-    Pipeline p = TestPipeline.create();
     p.getOptions().as(ObjectPipelineOptions.class).setValue(brokenValueType);
 
     p.apply(Create.of(1, 2, 3));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index b0c17d8..2327459 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -51,14 +51,16 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class TransformHierarchyTest {
+
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
   @Rule public ExpectedException thrown = ExpectedException.none();
+
   private TransformHierarchy hierarchy;
-  private TestPipeline pipeline;
+
 
   @Before
   public void setup() {
     hierarchy = new TransformHierarchy();
-    pipeline = TestPipeline.create();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index d70aa2f..6a6e0fc 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -55,6 +55,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class TransformTreeTest {
+
+  @Rule public final TestPipeline p = TestPipeline.create();
   @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   enum TransformsSeen {
@@ -112,11 +114,11 @@ public class TransformTreeTest {
   // visits the nodes and verifies that the hierarchy was captured.
   @Test
   public void testCompositeCapture() throws Exception {
+    p.enableAbandonedNodeEnforcement(false);
+
     File inputFile = tmpFolder.newFile();
     File outputFile = tmpFolder.newFile();
 
-    Pipeline p = TestPipeline.create();
-
     p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
         .apply(Sample.<String>any(10))
         .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
@@ -170,18 +172,15 @@ public class TransformTreeTest {
 
   @Test(expected = IllegalArgumentException.class)
   public void testOutputChecking() throws Exception {
-    Pipeline p = TestPipeline.create();
+    p.enableAbandonedNodeEnforcement(false);
 
     p.apply(new InvalidCompositeTransform());
-
     p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {});
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testMultiGraphSetup() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<Integer> input = p.begin()
         .apply(Create.of(1, 2, 3));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
index 417147f..a96e3f8 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,10 +46,12 @@ import org.junit.runners.JUnit4;
 /** Tests for {@link GatherAllPanes}. */
 @RunWith(JUnit4.class)
 public class GatherAllPanesTest implements Serializable {
+
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(NeedsRunner.class)
   public void singlePaneSingleReifiedPane() {
-    TestPipeline p = TestPipeline.create();
     PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> 
accumulatedPanes =
         p.apply(CountingInput.upTo(20000))
             .apply(
@@ -91,8 +94,6 @@ public class GatherAllPanesTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void multiplePanesMultipleReifiedPane() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Long> someElems = p.apply("someLongs", 
CountingInput.upTo(20000));
     PCollection<Long> otherElems = p.apply("otherLongs", 
CountingInput.upTo(20000));
     PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> 
accumulatedPanes =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index be8924f..1997bbe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -59,6 +59,10 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class PAssertTest implements Serializable {
+
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   @Rule
   public transient ExpectedException thrown = ExpectedException.none();
 
@@ -116,8 +120,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testContainsInAnyOrderNotSerializable() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<NotSerializableObject> pcollection = pipeline
         .apply(Create.of(
           new NotSerializableObject(),
@@ -139,8 +141,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testSerializablePredicate() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<NotSerializableObject> pcollection = pipeline
         .apply(Create.of(
           new NotSerializableObject(),
@@ -166,8 +166,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedSerializablePredicate() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<NotSerializableObject> pcollection = pipeline
         .apply(Create.timestamped(
             TimestampedValue.of(new NotSerializableObject(), new 
Instant(250L)),
@@ -207,7 +205,6 @@ public class PAssertTest implements Serializable {
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage("isEqualTo");
 
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton(pcollection).equals(42);
   }
@@ -222,7 +219,6 @@ public class PAssertTest implements Serializable {
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage("containsInAnyOrder");
 
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.that(pcollection).equals(42);
   }
@@ -237,7 +233,6 @@ public class PAssertTest implements Serializable {
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage(".hashCode() is not supported.");
 
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.thatSingleton(pcollection).hashCode();
   }
@@ -252,7 +247,6 @@ public class PAssertTest implements Serializable {
     thrown.expect(UnsupportedOperationException.class);
     thrown.expectMessage(".hashCode() is not supported.");
 
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(42));
     PAssert.that(pcollection).hashCode();
   }
@@ -263,7 +257,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testIsEqualTo() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).isEqualTo(43);
     pipeline.run();
@@ -275,7 +268,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedIsEqualTo() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection =
         pipeline.apply(Create.timestamped(TimestampedValue.of(43, new 
Instant(250L)),
             TimestampedValue.of(22, new Instant(-250L))))
@@ -295,7 +287,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testNotEqualTo() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(43));
     PAssert.thatSingleton(pcollection).notEqualTo(42);
     pipeline.run();
@@ -307,7 +298,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testContainsInAnyOrder() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
     PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
     pipeline.run();
@@ -319,7 +309,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testGlobalWindowContainsInAnyOrder() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
     
PAssert.that(pcollection).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(2, 
1, 4, 3);
     pipeline.run();
@@ -331,7 +320,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedContainsInAnyOrder() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> pcollection =
         pipeline.apply(Create.timestamped(TimestampedValue.of(1, new 
Instant(100L)),
             TimestampedValue.of(2, new Instant(200L)),
@@ -361,13 +349,12 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmpty() {
-    Pipeline p = TestPipeline.create();
     PCollection<Long> vals =
-        p.apply(Create.<Long>of().withCoder(VarLongCoder.of()));
+        pipeline.apply(Create.<Long>of().withCoder(VarLongCoder.of()));
 
     PAssert.that(vals).empty();
 
-    p.run();
+    pipeline.run();
   }
 
   /**
@@ -376,8 +363,6 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testContainsInAnyOrderFalse() throws Exception {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<Integer> pcollection = pipeline
         .apply(Create.of(1, 2, 3, 4));
 
@@ -399,11 +384,10 @@ public class PAssertTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testEmptyFalse() throws Exception {
-    Pipeline p = TestPipeline.create();
-    PCollection<Long> vals = p.apply(CountingInput.upTo(5L));
+    PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L));
     PAssert.that(vals).empty();
 
-    Throwable thrown = runExpectingAssertionFailure(p);
+    Throwable thrown = runExpectingAssertionFailure(pipeline);
 
     assertThat(thrown.getMessage(), containsString("Expected: iterable over [] 
in any order"));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index a1b4e4a..64aeca3 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -65,6 +65,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class TestStreamTest implements Serializable {
+  @Rule public transient TestPipeline p = TestPipeline.create();
   @Rule public transient ExpectedException thrown = ExpectedException.none();
 
   @Test
@@ -85,7 +86,6 @@ public class TestStreamTest implements Serializable {
             TimestampedValue.of(-3, instant))
         .advanceWatermarkToInfinity();
 
-    TestPipeline p = TestPipeline.create();
     PCollection<Integer> windowed = p
         .apply(source)
         
.apply(Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(5))).triggering(
@@ -146,7 +146,6 @@ public class TestStreamTest implements Serializable {
         .advanceProcessingTime(Duration.standardMinutes(6))
         .advanceWatermarkToInfinity();
 
-    TestPipeline p = TestPipeline.create();
     PCollection<Long> sum = p.apply(source)
         .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow()
             .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
@@ -175,7 +174,6 @@ public class TestStreamTest implements Serializable {
                 TimestampedValue.of("alsoFinalLatePane", new Instant(250)))
             .advanceWatermarkToInfinity();
 
-    TestPipeline p = TestPipeline.create();
     FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
     Duration allowedLateness = Duration.millis(5000L);
     PCollection<String> values =
@@ -220,7 +218,6 @@ public class TestStreamTest implements Serializable {
             .addElements(TimestampedValue.of("onTime", new Instant(100)))
             .advanceWatermarkToInfinity();
 
-    TestPipeline p = TestPipeline.create();
     FixedWindows windowFn = FixedWindows.of(Duration.millis(1000L));
     Duration allowedLateness = Duration.millis(5000L);
     PCollection<String> values = p.apply(stream)
@@ -249,7 +246,6 @@ public class TestStreamTest implements Serializable {
             TimestampedValue.of("bar", endOfGlobalWindow))
         .advanceWatermarkToInfinity();
 
-    TestPipeline p = TestPipeline.create();
     FixedWindows windows = FixedWindows.of(Duration.standardHours(6));
     PCollection<String> windowedValues = p.apply(stream)
         .apply(Window.<String>into(windows))
@@ -274,7 +270,6 @@ public class TestStreamTest implements Serializable {
     TestStream<Integer> other =
         TestStream.create(VarIntCoder.of()).addElements(1, 2, 3, 
4).advanceWatermarkToInfinity();
 
-    TestPipeline p = TestPipeline.create();
     PCollection<String> createStrings =
         p.apply("CreateStrings", stream)
             .apply("WindowStrings",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index ab13946..cd7898b 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -42,6 +42,7 @@ import org.hamcrest.CoreMatchers;
 import org.hamcrest.Description;
 import org.hamcrest.Matcher;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -63,6 +64,9 @@ public class ApproximateQuantilesTest {
       KV.of("b", 100)
   );
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   public PCollection<KV<String, Integer>> createInputTable(Pipeline p) {
     return p.apply(Create.of(TABLE).withCoder(
         KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())));
@@ -71,8 +75,6 @@ public class ApproximateQuantilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testQuantilesGlobally() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> input = intRangeCollection(p, 101);
     PCollection<List<Integer>> quantiles =
         input.apply(ApproximateQuantiles.<Integer>globally(5));
@@ -85,8 +87,6 @@ public class ApproximateQuantilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testQuantilesGobally_comparable() {
-    TestPipeline p = TestPipeline.create();
-
     PCollection<Integer> input = intRangeCollection(p, 101);
     PCollection<List<Integer>> quantiles =
         input.apply(
@@ -100,8 +100,6 @@ public class ApproximateQuantilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testQuantilesPerKey() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input = createInputTable(p);
     PCollection<KV<String, List<Integer>>> quantiles = input.apply(
         ApproximateQuantiles.<String, Integer>perKey(2));
@@ -117,8 +115,6 @@ public class ApproximateQuantilesTest {
   @Test
   @Category(NeedsRunner.class)
   public void testQuantilesPerKey_reversed() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input = createInputTable(p);
     PCollection<KV<String, List<Integer>>> quantiles = input.apply(
         ApproximateQuantiles.<String, Integer, DescendingIntComparator>perKey(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index b63c73d..3afc759 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -40,6 +40,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -52,6 +53,9 @@ import org.junit.runners.JUnit4;
 public class ApproximateUniqueTest implements Serializable {
   // implements Serializable just to make it easy to use anonymous inner DoFn 
subclasses
 
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
   @Test
   public void testEstimationErrorToSampleSize() {
     assertEquals(40000, ApproximateUnique.sampleSizeFromEstimationError(0.01));
@@ -67,8 +71,6 @@ public class ApproximateUniqueTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testApproximateUniqueWithSmallInput() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<Integer> input = p.apply(
         Create.of(Arrays.asList(1, 2, 3, 3)));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 8862531..cdd4707 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -28,7 +28,6 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -62,6 +61,7 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class  CombineFnsTest {
+  @Rule public final TestPipeline p = TestPipeline.create();
   @Rule public ExpectedException expectedException = ExpectedException.none();
 
   @Test
@@ -123,7 +123,6 @@ public class  CombineFnsTest {
   @Test
   @Category(RunnableOnService.class)
   public void testComposedCombine() {
-    Pipeline p = TestPipeline.create();
     p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
 
     PCollection<KV<String, KV<Integer, UserString>>> perKeyInput = p.apply(
@@ -178,7 +177,6 @@ public class  CombineFnsTest {
   @Test
   @Category(RunnableOnService.class)
   public void testComposedCombineWithContext() {
-    Pipeline p = TestPipeline.create();
     p.getCoderRegistry().registerCoder(UserString.class, UserStringCoder.of());
 
     PCollectionView<String> view = p
@@ -240,7 +238,6 @@ public class  CombineFnsTest {
   @Test
   @Category(RunnableOnService.class)
   public void testComposedCombineNullValues() {
-    Pipeline p = TestPipeline.create();
     p.getCoderRegistry().registerCoder(UserString.class, 
NullableCoder.of(UserStringCoder.of()));
     p.getCoderRegistry().registerCoder(String.class, 
NullableCoder.of(StringUtf8Coder.of()));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index 671f00e..0ac9502 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -82,6 +82,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.POutput;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -108,6 +109,9 @@ public class CombineTest implements Serializable {
 
   @Mock private DoFn<?, ?>.ProcessContext processContext;
 
+  @Rule
+  public final transient TestPipeline pipeline = TestPipeline.create();
+
   PCollection<KV<String, Integer>> createInput(Pipeline p,
                                                List<KV<String, Integer>> 
table) {
     return p.apply(Create.of(table).withCoder(
@@ -117,7 +121,6 @@ public class CombineTest implements Serializable {
   private void runTestSimpleCombine(List<KV<String, Integer>> table,
                                     int globalSum,
                                     List<KV<String, String>> perKeyCombines) {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Integer> sum = input
@@ -138,7 +141,6 @@ public class CombineTest implements Serializable {
                                                int globalSum,
                                                List<KV<String, String>> 
perKeyCombines,
                                                String[] globallyCombines) {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> perKeyInput = createInput(pipeline, 
table);
     PCollection<Integer> globallyInput = 
perKeyInput.apply(Values.<Integer>create());
 
@@ -197,7 +199,6 @@ public class CombineTest implements Serializable {
   private void runTestBasicCombine(List<KV<String, Integer>> table,
                                    Set<Integer> globalUnique,
                                    List<KV<String, Set<Integer>>> 
perKeyUnique) {
-    Pipeline pipeline = TestPipeline.create();
     pipeline.getCoderRegistry().registerCoder(Set.class, SetCoder.class);
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
@@ -233,7 +234,6 @@ public class CombineTest implements Serializable {
   private void runTestAccumulatingCombine(List<KV<String, Integer>> table,
                                           Double globalMean,
                                           List<KV<String, Double>> 
perKeyMeans) {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = createInput(pipeline, table);
 
     PCollection<Double> mean = input
@@ -253,8 +253,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFixedWindowsCombine() {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 
8L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
@@ -279,8 +277,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testFixedWindowsCombineWithContext() {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<KV<String, Integer>> perKeyInput =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 1L, 6L, 7L, 
8L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
@@ -316,8 +312,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testSlidingWindowsCombineWithContext() {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<KV<String, Integer>> perKeyInput =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(2L, 3L, 8L, 9L, 
10L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
@@ -365,7 +359,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testGlobalCombineWithDefaultsAndTriggers() {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> input = pipeline.apply(Create.of(1, 1));
 
     PCollection<String> output = input
@@ -392,8 +385,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testSessionsCombine() {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<KV<String, Integer>> input =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 
10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())))
@@ -417,8 +408,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testSessionsCombineWithContext() {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<KV<String, Integer>> perKeyInput =
         pipeline.apply(Create.timestamped(TABLE, Arrays.asList(0L, 4L, 7L, 
10L, 16L))
                 .withCoder(KvCoder.of(StringUtf8Coder.of(), 
BigEndianIntegerCoder.of())));
@@ -459,8 +448,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testWindowedCombineEmpty() {
-    Pipeline pipeline = TestPipeline.create();
-
     PCollection<Double> mean = pipeline
         .apply(Create.<Integer>of().withCoder(BigEndianIntegerCoder.of()))
         .apply(Window.<Integer>into(FixedWindows.of(Duration.millis(1))))
@@ -517,7 +504,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testHotKeyCombining() {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = copy(createInput(pipeline, 
TABLE), 10);
 
     KeyedCombineFn<String, Integer, ?, Double> mean =
@@ -552,7 +538,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testHotKeyCombiningWithAccumulationMode() {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5));
 
     PCollection<Integer> output = input
@@ -577,7 +562,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testBinaryCombineFn() {
-    Pipeline pipeline = TestPipeline.create();
     PCollection<KV<String, Integer>> input = copy(createInput(pipeline, 
TABLE), 2);
     PCollection<KV<String, Integer>> intProduct = input
         .apply("IntProduct", Combine.<String, Integer, Integer>perKey(new 
TestProdInt()));
@@ -632,7 +616,6 @@ public class CombineTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testCombineGloballyAsSingletonView() {
-    Pipeline pipeline = TestPipeline.create();
     final PCollectionView<Integer> view = pipeline
         .apply("CreateEmptySideInput", 
Create.<Integer>of().withCoder(BigEndianIntegerCoder.of()))
         .apply(Sum.integersGlobally().asSingletonView());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/75a4c918/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
index 7f77ae7..eafb12d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CountTest.java
@@ -22,13 +22,13 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
 import java.util.List;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -45,12 +45,13 @@ public class CountTest {
 
   static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
 
+  @Rule
+  public TestPipeline p = TestPipeline.create();
+
   @Test
   @Category(RunnableOnService.class)
   @SuppressWarnings("unchecked")
   public void testCountPerElementBasic() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input = p.apply(Create.of(WORDS));
 
     PCollection<KV<String, Long>> output =
@@ -71,8 +72,6 @@ public class CountTest {
   @Category(RunnableOnService.class)
   @SuppressWarnings("unchecked")
   public void testCountPerElementEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input = 
p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of()));
 
     PCollection<KV<String, Long>> output =
@@ -85,8 +84,6 @@ public class CountTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCountGloballyBasic() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input = p.apply(Create.of(WORDS));
 
     PCollection<Long> output =
@@ -100,8 +97,6 @@ public class CountTest {
   @Test
   @Category(RunnableOnService.class)
   public void testCountGloballyEmpty() {
-    Pipeline p = TestPipeline.create();
-
     PCollection<String> input = 
p.apply(Create.of(NO_LINES).withCoder(StringUtf8Coder.of()));
 
     PCollection<Long> output =

Reply via email to